aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java5
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala4
3 files changed, 18 insertions, 0 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
index ff0be820e0..63fd6c4422 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
@@ -23,6 +23,11 @@ import org.apache.spark.streaming.api.java.*;
public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
@Override
+ public void onStreamingStarted(JavaStreamingListenerStreamingStarted streamingStarted) {
+ super.onStreamingStarted(streamingStarted);
+ }
+
+ @Override
public void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted) {
JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo();
receiverInfo.streamId();
diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
index 0295e059f7..cfd4323531 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -29,6 +29,10 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
val listener = new TestJavaStreamingListener()
val listenerWrapper = new JavaStreamingListenerWrapper(listener)
+ val streamingStarted = StreamingListenerStreamingStarted(1000L)
+ listenerWrapper.onStreamingStarted(streamingStarted)
+ assert(listener.streamingStarted.time === streamingStarted.time)
+
val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
streamId = 2,
name = "test",
@@ -249,6 +253,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
class TestJavaStreamingListener extends JavaStreamingListener {
+ var streamingStarted: JavaStreamingListenerStreamingStarted = null
var receiverStarted: JavaStreamingListenerReceiverStarted = null
var receiverError: JavaStreamingListenerReceiverError = null
var receiverStopped: JavaStreamingListenerReceiverStopped = null
@@ -258,6 +263,10 @@ class TestJavaStreamingListener extends JavaStreamingListener {
var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
+ override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+ this.streamingStarted = streamingStarted
+ }
+
override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
this.receiverStarted = receiverStarted
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 46ab3ac8de..56b400850f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -62,6 +62,10 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
0 -> StreamInputInfo(0, 300L),
1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test")))
+ // onStreamingStarted
+ listener.onStreamingStarted(StreamingListenerStreamingStarted(100L))
+ listener.startTime should be (100)
+
// onBatchSubmitted
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))