diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-04-12 00:43:28 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-12 00:43:28 -0700 |
commit | b0f5497e9520575e5082fa8ce8be5569f43abe74 (patch) | |
tree | efd349be7227cf20616712fc7376b7c2f11f6614 /external/kinesis-asl | |
parent | 678b96e77bf77a64b8df14b19db5a3bb18febfe3 (diff) | |
download | spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.gz spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.bz2 spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.zip |
[SPARK-14508][BUILD] Add a new ScalaStyle Rule `OmitBracesInCase`
## What changes were proposed in this pull request?
According to the [Spark Code Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) and [Scala Style Guide](http://docs.scala-lang.org/style/control-structures.html#curlybraces), we had better enforce the following rule.
```
case: Always omit braces in case clauses.
```
This PR makes a new ScalaStyle rule, 'OmitBracesInCase', and enforces it to the code.
## How was this patch tested?
Pass the Jenkins tests (including Scala style checking)
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #12280 from dongjoon-hyun/SPARK-14508.
Diffstat (limited to 'external/kinesis-asl')
-rw-r--r-- | external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala | 25 |
1 files changed, 10 insertions, 15 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index 41c6ab123b..80e0cce055 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -73,7 +73,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") receiver.setCheckpointer(shardId, checkpointer) } catch { - case NonFatal(e) => { + case NonFatal(e) => /* * If there is a failure within the batch, the batch will not be checkpointed. * This will potentially cause records since the last checkpoint to be processed @@ -84,7 +84,6 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */ throw e - } } } else { /* RecordProcessor has been stopped. */ @@ -148,29 +147,25 @@ private[kinesis] object KinesisRecordProcessor extends Logging { /* If the function failed, either retry or throw the exception */ case util.Failure(e) => e match { /* Retry: Throttling or other Retryable exception has occurred */ - case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1 - => { - val backOffMillis = Random.nextInt(maxBackOffMillis) - Thread.sleep(backOffMillis) - logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e) - retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis) - } + case _: ThrottlingException | _: KinesisClientLibDependencyException + if numRetriesLeft > 1 => + val backOffMillis = Random.nextInt(maxBackOffMillis) + Thread.sleep(backOffMillis) + logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e) + retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis) /* Throw: Shutdown has been requested by the Kinesis Client Library. */ - case _: ShutdownException => { + case _: ShutdownException => logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e) throw e - } /* Throw: Non-retryable exception has occurred with the Kinesis Client Library */ - case _: InvalidStateException => { + case _: InvalidStateException => logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" + s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e) throw e - } /* Throw: Unexpected exception has occurred */ - case _ => { + case _ => logError(s"Unexpected, non-retryable exception.", e) throw e - } } } } |