aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorGaurav <gaurav@techtinium.com>2017-03-06 10:41:49 -0800
committerBurak Yavuz <brkyvz@gmail.com>2017-03-06 10:41:49 -0800
commit46a64d1e0ae12c31e848f377a84fb28e3efb3699 (patch)
treef070e6a3646450030a33e66282e85aa1efce6bdb /external
parent339b53a1311e08521d84a83c94201fcf3c766fb2 (diff)
downloadspark-46a64d1e0ae12c31e848f377a84fb28e3efb3699.tar.gz
spark-46a64d1e0ae12c31e848f377a84fb28e3efb3699.tar.bz2
spark-46a64d1e0ae12c31e848f377a84fb28e3efb3699.zip
[SPARK-19304][STREAMING][KINESIS] fix kinesis slow checkpoint recovery
## What changes were proposed in this pull request? added a limit to getRecords api call call in KinesisBackedBlockRdd. This helps reduce the amount of data returned by kinesis api call making the recovery considerably faster As we are storing the `fromSeqNum` & `toSeqNum` in checkpoint metadata, we can also store the number of records. Which can later be used for api call. ## How was this patch tested? The patch was manually tested Apologies for any silly mistakes, opening first pull request Author: Gaurav <gaurav@techtinium.com> Closes #16842 from Gauravshah/kinesis_checkpoint_recovery_fix_2_1_0.
Diffstat (limited to 'external')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala25
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala3
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala4
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala4
4 files changed, 25 insertions, 11 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 23c4d99e50..0f1790bddc 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -36,7 +36,11 @@ import org.apache.spark.util.NextIterator
/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */
private[kinesis]
case class SequenceNumberRange(
- streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String)
+ streamName: String,
+ shardId: String,
+ fromSeqNumber: String,
+ toSeqNumber: String,
+ recordCount: Int)
/** Class representing an array of Kinesis sequence number ranges */
private[kinesis]
@@ -136,6 +140,8 @@ class KinesisSequenceRangeIterator(
private val client = new AmazonKinesisClient(credentials)
private val streamName = range.streamName
private val shardId = range.shardId
+ // AWS limits to maximum of 10k records per get call
+ private val maxGetRecordsLimit = 10000
private var toSeqNumberReceived = false
private var lastSeqNumber: String = null
@@ -153,12 +159,14 @@ class KinesisSequenceRangeIterator(
// If the internal iterator has not been initialized,
// then fetch records from starting sequence number
- internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber)
+ internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber,
+ range.recordCount)
} else if (!internalIterator.hasNext) {
// If the internal iterator does not have any more records,
// then fetch more records after the last consumed sequence number
- internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber)
+ internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber,
+ range.recordCount)
}
if (!internalIterator.hasNext) {
@@ -191,9 +199,12 @@ class KinesisSequenceRangeIterator(
/**
* Get records starting from or after the given sequence number.
*/
- private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = {
+ private def getRecords(
+ iteratorType: ShardIteratorType,
+ seqNum: String,
+ recordCount: Int): Iterator[Record] = {
val shardIterator = getKinesisIterator(iteratorType, seqNum)
- val result = getRecordsAndNextKinesisIterator(shardIterator)
+ val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount)
result._1
}
@@ -202,10 +213,12 @@ class KinesisSequenceRangeIterator(
* to get records from Kinesis), and get the next shard iterator for next consumption.
*/
private def getRecordsAndNextKinesisIterator(
- shardIterator: String): (Iterator[Record], String) = {
+ shardIterator: String,
+ recordCount: Int): (Iterator[Record], String) = {
val getRecordsRequest = new GetRecordsRequest
getRecordsRequest.setRequestCredentials(credentials)
getRecordsRequest.setShardIterator(shardIterator)
+ getRecordsRequest.setLimit(Math.min(recordCount, this.maxGetRecordsLimit))
val getRecordsResult = retryOrTimeout[GetRecordsResult](
s"getting records using shard iterator") {
client.getRecords(getRecordsRequest)
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 13fc54e531..320728f4bb 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -210,7 +210,8 @@ private[kinesis] class KinesisReceiver[T](
if (records.size > 0) {
val dataIterator = records.iterator().asScala.map(messageHandler)
val metadata = SequenceNumberRange(streamName, shardId,
- records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
+ records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber(),
+ records.size())
blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
}
}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 18a5a1509a..2c7b9c58e6 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -51,7 +51,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }}
shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
val seqNumRange = SequenceNumberRange(
- testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
+ testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size)
(shardId, seqNumRange)
}
allRanges = shardIdToRange.values.toSeq
@@ -181,7 +181,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
// Create the necessary ranges to use in the RDD
val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
- SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")))
+ SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 1)))
val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis)))
SequenceNumberRanges(Array(range))
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 387a96f26b..afb55c84f8 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -119,13 +119,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Generate block info data for testing
val seqNumRanges1 = SequenceNumberRanges(
- SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
+ SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
val blockId1 = StreamBlockId(kinesisStream.id, 123)
val blockInfo1 = ReceivedBlockInfo(
0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
val seqNumRanges2 = SequenceNumberRanges(
- SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
+ SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
val blockId2 = StreamBlockId(kinesisStream.id, 345)
val blockInfo2 = ReceivedBlockInfo(
0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))