aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala15
1 files changed, 14 insertions, 1 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
index 3e697f36a4..c445c15a5f 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
@@ -64,7 +64,20 @@ private[kinesis] class KinesisCheckpointer(
def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
synchronized {
checkpointers.remove(shardId)
- checkpoint(shardId, checkpointer)
+ }
+ if (checkpointer != null) {
+ try {
+ // We must call `checkpoint()` with no parameter to finish reading shards.
+ // See an URL below for details:
+ // https://forums.aws.amazon.com/thread.jspa?threadID=244218
+ KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Exception: WorkerId $workerId encountered an exception while checkpointing" +
+ s"to finish reading a shard of $shardId.", e)
+ // Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor
+ throw e
+ }
}
}