aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-04-12 00:43:28 -0700
committerReynold Xin <rxin@databricks.com>2016-04-12 00:43:28 -0700
commitb0f5497e9520575e5082fa8ce8be5569f43abe74 (patch)
treeefd349be7227cf20616712fc7376b7c2f11f6614 /external/kinesis-asl
parent678b96e77bf77a64b8df14b19db5a3bb18febfe3 (diff)
downloadspark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.gz
spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.bz2
spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.zip
[SPARK-14508][BUILD] Add a new ScalaStyle Rule `OmitBracesInCase`
## What changes were proposed in this pull request? According to the [Spark Code Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) and [Scala Style Guide](http://docs.scala-lang.org/style/control-structures.html#curlybraces), we had better enforce the following rule. ``` case: Always omit braces in case clauses. ``` This PR makes a new ScalaStyle rule, 'OmitBracesInCase', and enforces it to the code. ## How was this patch tested? Pass the Jenkins tests (including Scala style checking) Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12280 from dongjoon-hyun/SPARK-14508.
Diffstat (limited to 'external/kinesis-asl')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala25
1 files changed, 10 insertions, 15 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 41c6ab123b..80e0cce055 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -73,7 +73,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
receiver.setCheckpointer(shardId, checkpointer)
} catch {
- case NonFatal(e) => {
+ case NonFatal(e) =>
/*
* If there is a failure within the batch, the batch will not be checkpointed.
* This will potentially cause records since the last checkpoint to be processed
@@ -84,7 +84,6 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
throw e
- }
}
} else {
/* RecordProcessor has been stopped. */
@@ -148,29 +147,25 @@ private[kinesis] object KinesisRecordProcessor extends Logging {
/* If the function failed, either retry or throw the exception */
case util.Failure(e) => e match {
/* Retry: Throttling or other Retryable exception has occurred */
- case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1
- => {
- val backOffMillis = Random.nextInt(maxBackOffMillis)
- Thread.sleep(backOffMillis)
- logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
- retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
- }
+ case _: ThrottlingException | _: KinesisClientLibDependencyException
+ if numRetriesLeft > 1 =>
+ val backOffMillis = Random.nextInt(maxBackOffMillis)
+ Thread.sleep(backOffMillis)
+ logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
+ retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
/* Throw: Shutdown has been requested by the Kinesis Client Library. */
- case _: ShutdownException => {
+ case _: ShutdownException =>
logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
throw e
- }
/* Throw: Non-retryable exception has occurred with the Kinesis Client Library */
- case _: InvalidStateException => {
+ case _: InvalidStateException =>
logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" +
s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e)
throw e
- }
/* Throw: Unexpected exception has occurred */
- case _ => {
+ case _ =>
logError(s"Unexpected, non-retryable exception.", e)
throw e
- }
}
}
}