aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala')
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala17
1 files changed, 17 insertions, 0 deletions
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index deac9090e2..800502a77d 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -69,6 +69,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
test("process records including store and set checkpointer") {
when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
recordProcessor.initialize(shardId)
@@ -79,8 +80,23 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
}
+ test("split into multiple processes if a limitation is set") {
+ when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.getCurrentLimit).thenReturn(1)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+ recordProcessor.initialize(shardId)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).addRecords(shardId, batch.subList(0, 1))
+ verify(receiverMock, times(1)).addRecords(shardId, batch.subList(1, 2))
+ verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
+ }
+
test("shouldn't store and update checkpointer when receiver is stopped") {
when(receiverMock.isStopped()).thenReturn(true)
+ when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
recordProcessor.processRecords(batch, checkpointerMock)
@@ -92,6 +108,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
test("shouldn't update checkpointer when exception occurs during store") {
when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
when(
receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
).thenThrow(new RuntimeException())