aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala6
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala14
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala17
3 files changed, 35 insertions, 2 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 858368d135..393e56a393 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T](
}
}
+ /** Return the current rate limit defined in [[BlockGenerator]]. */
+ private[kinesis] def getCurrentLimit: Int = {
+ assert(blockGenerator != null)
+ math.min(blockGenerator.getCurrentLimit, Int.MaxValue).toInt
+ }
+
/** Get the latest sequence number for the given shard that can be checkpointed through KCL */
private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
Option(shardIdToLatestStoredSeqNum.get(shardId))
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index a0ccd086d9..73ccc4ad23 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -68,8 +68,18 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
if (!receiver.isStopped()) {
try {
- receiver.addRecords(shardId, batch)
- logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
+ // Limit the number of processed records from Kinesis stream. This is because the KCL cannot
+ // control the number of aggregated records to be fetched even if we set `MaxRecords`
+ // in `KinesisClientLibConfiguration`. For example, if we set 10 to the number of max
+ // records in a worker and a producer aggregates two records into one message, the worker
+ // possibly 20 records every callback function called.
+ val maxRecords = receiver.getCurrentLimit
+ for (start <- 0 until batch.size by maxRecords) {
+ val miniBatch = batch.subList(start, math.min(start + maxRecords, batch.size))
+ receiver.addRecords(shardId, miniBatch)
+ logDebug(s"Stored: Worker $workerId stored ${miniBatch.size} records " +
+ s"for shardId $shardId")
+ }
receiver.setCheckpointer(shardId, checkpointer)
} catch {
case NonFatal(e) =>
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index deac9090e2..800502a77d 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -69,6 +69,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
test("process records including store and set checkpointer") {
when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
recordProcessor.initialize(shardId)
@@ -79,8 +80,23 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
}
+ test("split into multiple processes if a limitation is set") {
+ when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.getCurrentLimit).thenReturn(1)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+ recordProcessor.initialize(shardId)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).addRecords(shardId, batch.subList(0, 1))
+ verify(receiverMock, times(1)).addRecords(shardId, batch.subList(1, 2))
+ verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
+ }
+
test("shouldn't store and update checkpointer when receiver is stopped") {
when(receiverMock.isStopped()).thenReturn(true)
+ when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
recordProcessor.processRecords(batch, checkpointerMock)
@@ -92,6 +108,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
test("shouldn't update checkpointer when exception occurs during store") {
when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
when(
receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
).thenThrow(new RuntimeException())