diff options
Diffstat (limited to 'streaming/src/test')
3 files changed, 18 insertions, 0 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java index ff0be820e0..63fd6c4422 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java @@ -23,6 +23,11 @@ import org.apache.spark.streaming.api.java.*; public class JavaStreamingListenerAPISuite extends JavaStreamingListener { @Override + public void onStreamingStarted(JavaStreamingListenerStreamingStarted streamingStarted) { + super.onStreamingStarted(streamingStarted); + } + + @Override public void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted) { JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo(); receiverInfo.streamId(); diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala index 0295e059f7..cfd4323531 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala @@ -29,6 +29,10 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { val listener = new TestJavaStreamingListener() val listenerWrapper = new JavaStreamingListenerWrapper(listener) + val streamingStarted = StreamingListenerStreamingStarted(1000L) + listenerWrapper.onStreamingStarted(streamingStarted) + assert(listener.streamingStarted.time === streamingStarted.time) + val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo( streamId = 2, name = "test", @@ -249,6 +253,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { class TestJavaStreamingListener extends JavaStreamingListener { + var streamingStarted: JavaStreamingListenerStreamingStarted = null var receiverStarted: JavaStreamingListenerReceiverStarted = null var receiverError: JavaStreamingListenerReceiverError = null var receiverStopped: JavaStreamingListenerReceiverStopped = null @@ -258,6 +263,10 @@ class TestJavaStreamingListener extends JavaStreamingListener { var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null + override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = { + this.streamingStarted = streamingStarted + } + override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { this.receiverStarted = receiverStarted } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 46ab3ac8de..56b400850f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -62,6 +62,10 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { 0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test"))) + // onStreamingStarted + listener.onStreamingStarted(StreamingListenerStreamingStarted(100L)) + listener.startTime should be (100) + // onBatchSubmitted val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) |