diff options
Diffstat (limited to 'external/kinesis-asl/src/test/scala/org')
-rw-r--r-- | external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala | 17 |
1 files changed, 17 insertions, 0 deletions
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index deac9090e2..800502a77d 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -69,6 +69,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft test("process records including store and set checkpointer") { when(receiverMock.isStopped()).thenReturn(false) + when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue) val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) recordProcessor.initialize(shardId) @@ -79,8 +80,23 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock) } + test("split into multiple processes if a limitation is set") { + when(receiverMock.isStopped()).thenReturn(false) + when(receiverMock.getCurrentLimit).thenReturn(1) + + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) + recordProcessor.initialize(shardId) + recordProcessor.processRecords(batch, checkpointerMock) + + verify(receiverMock, times(1)).isStopped() + verify(receiverMock, times(1)).addRecords(shardId, batch.subList(0, 1)) + verify(receiverMock, times(1)).addRecords(shardId, batch.subList(1, 2)) + verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock) + } + test("shouldn't store and update checkpointer when receiver is stopped") { when(receiverMock.isStopped()).thenReturn(true) + when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue) val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) recordProcessor.processRecords(batch, checkpointerMock) @@ -92,6 +108,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft test("shouldn't update checkpointer when exception occurs during store") { when(receiverMock.isStopped()).thenReturn(false) + when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue) when( receiverMock.addRecords(anyString, anyListOf(classOf[Record])) ).thenThrow(new RuntimeException()) |