aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala25
1 files changed, 10 insertions, 15 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 41c6ab123b..80e0cce055 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -73,7 +73,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
receiver.setCheckpointer(shardId, checkpointer)
} catch {
- case NonFatal(e) => {
+ case NonFatal(e) =>
/*
* If there is a failure within the batch, the batch will not be checkpointed.
* This will potentially cause records since the last checkpoint to be processed
@@ -84,7 +84,6 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
throw e
- }
}
} else {
/* RecordProcessor has been stopped. */
@@ -148,29 +147,25 @@ private[kinesis] object KinesisRecordProcessor extends Logging {
/* If the function failed, either retry or throw the exception */
case util.Failure(e) => e match {
/* Retry: Throttling or other Retryable exception has occurred */
- case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1
- => {
- val backOffMillis = Random.nextInt(maxBackOffMillis)
- Thread.sleep(backOffMillis)
- logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
- retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
- }
+ case _: ThrottlingException | _: KinesisClientLibDependencyException
+ if numRetriesLeft > 1 =>
+ val backOffMillis = Random.nextInt(maxBackOffMillis)
+ Thread.sleep(backOffMillis)
+ logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
+ retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
/* Throw: Shutdown has been requested by the Kinesis Client Library. */
- case _: ShutdownException => {
+ case _: ShutdownException =>
logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
throw e
- }
/* Throw: Non-retryable exception has occurred with the Kinesis Client Library */
- case _: InvalidStateException => {
+ case _: InvalidStateException =>
logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" +
s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e)
throw e
- }
/* Throw: Unexpected exception has occurred */
- case _ => {
+ case _ =>
logError(s"Unexpected, non-retryable exception.", e)
throw e
- }
}
}
}