diff options
author | saturday_s <shi.indetail@gmail.com> | 2016-12-22 12:51:37 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-12-22 12:51:37 -0800 |
commit | ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc (patch) | |
tree | 95c16fa2220409a658202776d9d70a2ed849d636 /streaming/src/test/java/org/apache | |
parent | 31da755c80aed8219c368fd18c72b42e50be46fc (diff) | |
download | spark-ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc.tar.gz spark-ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc.tar.bz2 spark-ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc.zip |
[SPARK-18537][WEB UI] Add a REST api to serve spark streaming information
## What changes were proposed in this pull request?
This PR is an inheritance from #16000, and is a completion of #15904.
**Description**
- Augment the `org.apache.spark.status.api.v1` package for serving streaming information.
- Retrieve the streaming information through StreamingJobProgressListener.
> this api should cover exceptly the same amount of information as you can get from the web interface
> the implementation is base on the current REST implementation of spark-core
> and will be available for running applications only
>
> https://issues.apache.org/jira/browse/SPARK-18537
## How was this patch tested?
Local test.
Author: saturday_s <shi.indetail@gmail.com>
Author: Chan Chor Pang <ChorPang.Chan@access-company.com>
Author: peterCPChan <universknight@gmail.com>
Closes #16253 from saturday-shi/SPARK-18537.
Diffstat (limited to 'streaming/src/test/java/org/apache')
2 files changed, 14 insertions, 0 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java index ff0be820e0..63fd6c4422 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java @@ -23,6 +23,11 @@ import org.apache.spark.streaming.api.java.*; public class JavaStreamingListenerAPISuite extends JavaStreamingListener { @Override + public void onStreamingStarted(JavaStreamingListenerStreamingStarted streamingStarted) { + super.onStreamingStarted(streamingStarted); + } + + @Override public void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted) { JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo(); receiverInfo.streamId(); diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala index 0295e059f7..cfd4323531 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala @@ -29,6 +29,10 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { val listener = new TestJavaStreamingListener() val listenerWrapper = new JavaStreamingListenerWrapper(listener) + val streamingStarted = StreamingListenerStreamingStarted(1000L) + listenerWrapper.onStreamingStarted(streamingStarted) + assert(listener.streamingStarted.time === streamingStarted.time) + val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo( streamId = 2, name = "test", @@ -249,6 +253,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { class TestJavaStreamingListener extends JavaStreamingListener { + var streamingStarted: JavaStreamingListenerStreamingStarted = null var receiverStarted: JavaStreamingListenerReceiverStarted = null var receiverError: JavaStreamingListenerReceiverError = null var receiverStopped: JavaStreamingListenerReceiverStopped = null @@ -258,6 +263,10 @@ class TestJavaStreamingListener extends JavaStreamingListener { var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null + override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = { + this.streamingStarted = streamingStarted + } + override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { this.receiverStarted = receiverStarted } |