aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala25
1 files changed, 19 insertions, 6 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 23c4d99e50..0f1790bddc 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -36,7 +36,11 @@ import org.apache.spark.util.NextIterator
/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */
private[kinesis]
case class SequenceNumberRange(
- streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String)
+ streamName: String,
+ shardId: String,
+ fromSeqNumber: String,
+ toSeqNumber: String,
+ recordCount: Int)
/** Class representing an array of Kinesis sequence number ranges */
private[kinesis]
@@ -136,6 +140,8 @@ class KinesisSequenceRangeIterator(
private val client = new AmazonKinesisClient(credentials)
private val streamName = range.streamName
private val shardId = range.shardId
+ // AWS limits to maximum of 10k records per get call
+ private val maxGetRecordsLimit = 10000
private var toSeqNumberReceived = false
private var lastSeqNumber: String = null
@@ -153,12 +159,14 @@ class KinesisSequenceRangeIterator(
// If the internal iterator has not been initialized,
// then fetch records from starting sequence number
- internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber)
+ internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber,
+ range.recordCount)
} else if (!internalIterator.hasNext) {
// If the internal iterator does not have any more records,
// then fetch more records after the last consumed sequence number
- internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber)
+ internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber,
+ range.recordCount)
}
if (!internalIterator.hasNext) {
@@ -191,9 +199,12 @@ class KinesisSequenceRangeIterator(
/**
* Get records starting from or after the given sequence number.
*/
- private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = {
+ private def getRecords(
+ iteratorType: ShardIteratorType,
+ seqNum: String,
+ recordCount: Int): Iterator[Record] = {
val shardIterator = getKinesisIterator(iteratorType, seqNum)
- val result = getRecordsAndNextKinesisIterator(shardIterator)
+ val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount)
result._1
}
@@ -202,10 +213,12 @@ class KinesisSequenceRangeIterator(
* to get records from Kinesis), and get the next shard iterator for next consumption.
*/
private def getRecordsAndNextKinesisIterator(
- shardIterator: String): (Iterator[Record], String) = {
+ shardIterator: String,
+ recordCount: Int): (Iterator[Record], String) = {
val getRecordsRequest = new GetRecordsRequest
getRecordsRequest.setRequestCredentials(credentials)
getRecordsRequest.setShardIterator(shardIterator)
+ getRecordsRequest.setLimit(Math.min(recordCount, this.maxGetRecordsLimit))
val getRecordsResult = retryOrTimeout[GetRecordsResult](
s"getting records using shard iterator") {
client.getRecords(getRecordsRequest)