aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src/main
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2017-01-25 17:38:48 -0800
committerBurak Yavuz <brkyvz@gmail.com>2017-01-25 17:38:48 -0800
commit256a3a801366ab9f705e50690114e49fdb49b38e (patch)
treed1d6eacbc69e23c8cfe27957172e38dba983939f /external/kinesis-asl/src/main
parent2338451266d37b4c952827325cdee53b3e8fbc78 (diff)
downloadspark-256a3a801366ab9f705e50690114e49fdb49b38e.tar.gz
spark-256a3a801366ab9f705e50690114e49fdb49b38e.tar.bz2
spark-256a3a801366ab9f705e50690114e49fdb49b38e.zip
[SPARK-18020][STREAMING][KINESIS] Checkpoint SHARD_END to finish reading closed shards
## What changes were proposed in this pull request? This pr is to fix an issue occurred when resharding Kinesis streams; the resharding makes the KCL throw an exception because Spark does not checkpoint `SHARD_END` when finishing reading closed shards in `KinesisRecordProcessor#shutdown`. This bug finally leads to stopping subscribing new split (or merged) shards. ## How was this patch tested? Added a test in `KinesisStreamSuite` to check if it works well when splitting/merging shards. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #16213 from maropu/SPARK-18020.
Diffstat (limited to 'external/kinesis-asl/src/main')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala15
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala30
2 files changed, 41 insertions, 4 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
index 3e697f36a4..c445c15a5f 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
@@ -64,7 +64,20 @@ private[kinesis] class KinesisCheckpointer(
def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
synchronized {
checkpointers.remove(shardId)
- checkpoint(shardId, checkpointer)
+ }
+ if (checkpointer != null) {
+ try {
+ // We must call `checkpoint()` with no parameter to finish reading shards.
+ // See an URL below for details:
+ // https://forums.aws.amazon.com/thread.jspa?threadID=244218
+ KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Exception: WorkerId $workerId encountered an exception while checkpointing" +
+ s"to finish reading a shard of $shardId.", e)
+ // Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor
+ throw e
+ }
}
}
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 0fe66254e9..f183ef00b3 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -40,11 +40,10 @@ import org.apache.spark.internal.Logging
*
* PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS FILE!
*/
-private[kinesis] class KinesisTestUtils extends Logging {
+private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Logging {
val endpointUrl = KinesisTestUtils.endpointUrl
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
- val streamShardCount = 2
private val createStreamTimeoutSeconds = 300
private val describeStreamPollTimeSeconds = 1
@@ -88,7 +87,7 @@ private[kinesis] class KinesisTestUtils extends Logging {
logInfo(s"Creating stream ${_streamName}")
val createStreamRequest = new CreateStreamRequest()
createStreamRequest.setStreamName(_streamName)
- createStreamRequest.setShardCount(2)
+ createStreamRequest.setShardCount(streamShardCount)
kinesisClient.createStream(createStreamRequest)
// The stream is now being created. Wait for it to become active.
@@ -97,6 +96,31 @@ private[kinesis] class KinesisTestUtils extends Logging {
logInfo(s"Created stream ${_streamName}")
}
+ def getShards(): Seq[Shard] = {
+ kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala
+ }
+
+ def splitShard(shardId: String): Unit = {
+ val splitShardRequest = new SplitShardRequest()
+ splitShardRequest.withStreamName(_streamName)
+ splitShardRequest.withShardToSplit(shardId)
+ // Set a half of the max hash value
+ splitShardRequest.withNewStartingHashKey("170141183460469231731687303715884105728")
+ kinesisClient.splitShard(splitShardRequest)
+ // Wait for the shards to become active
+ waitForStreamToBeActive(_streamName)
+ }
+
+ def mergeShard(shardToMerge: String, adjacentShardToMerge: String): Unit = {
+ val mergeShardRequest = new MergeShardsRequest
+ mergeShardRequest.withStreamName(_streamName)
+ mergeShardRequest.withShardToMerge(shardToMerge)
+ mergeShardRequest.withAdjacentShardToMerge(adjacentShardToMerge)
+ kinesisClient.mergeShards(mergeShardRequest)
+ // Wait for the shards to become active
+ waitForStreamToBeActive(_streamName)
+ }
+
/**
* Push data to Kinesis stream and return a map of
* shardId -> seq of (data, seq number) pushed to corresponding shard