aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala9
1 files changed, 9 insertions, 0 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
index 0295e059f7..cfd4323531 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -29,6 +29,10 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
val listener = new TestJavaStreamingListener()
val listenerWrapper = new JavaStreamingListenerWrapper(listener)
+ val streamingStarted = StreamingListenerStreamingStarted(1000L)
+ listenerWrapper.onStreamingStarted(streamingStarted)
+ assert(listener.streamingStarted.time === streamingStarted.time)
+
val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
streamId = 2,
name = "test",
@@ -249,6 +253,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
class TestJavaStreamingListener extends JavaStreamingListener {
+ var streamingStarted: JavaStreamingListenerStreamingStarted = null
var receiverStarted: JavaStreamingListenerReceiverStarted = null
var receiverError: JavaStreamingListenerReceiverError = null
var receiverStopped: JavaStreamingListenerReceiverStopped = null
@@ -258,6 +263,10 @@ class TestJavaStreamingListener extends JavaStreamingListener {
var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
+ override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+ this.streamingStarted = streamingStarted
+ }
+
override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
this.receiverStarted = receiverStarted
}