aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorsaturday_s <shi.indetail@gmail.com>2016-12-22 12:51:37 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2016-12-22 12:51:37 -0800
commitce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc (patch)
tree95c16fa2220409a658202776d9d70a2ed849d636 /streaming/src/test/java/org/apache
parent31da755c80aed8219c368fd18c72b42e50be46fc (diff)
downloadspark-ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc.tar.gz
spark-ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc.tar.bz2
spark-ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc.zip
[SPARK-18537][WEB UI] Add a REST api to serve spark streaming information
## What changes were proposed in this pull request? This PR is an inheritance from #16000, and is a completion of #15904. **Description** - Augment the `org.apache.spark.status.api.v1` package for serving streaming information. - Retrieve the streaming information through StreamingJobProgressListener. > this api should cover exceptly the same amount of information as you can get from the web interface > the implementation is base on the current REST implementation of spark-core > and will be available for running applications only > > https://issues.apache.org/jira/browse/SPARK-18537 ## How was this patch tested? Local test. Author: saturday_s <shi.indetail@gmail.com> Author: Chan Chor Pang <ChorPang.Chan@access-company.com> Author: peterCPChan <universknight@gmail.com> Closes #16253 from saturday-shi/SPARK-18537.
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java5
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala9
2 files changed, 14 insertions, 0 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
index ff0be820e0..63fd6c4422 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
@@ -23,6 +23,11 @@ import org.apache.spark.streaming.api.java.*;
public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
@Override
+ public void onStreamingStarted(JavaStreamingListenerStreamingStarted streamingStarted) {
+ super.onStreamingStarted(streamingStarted);
+ }
+
+ @Override
public void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted) {
JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo();
receiverInfo.streamId();
diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
index 0295e059f7..cfd4323531 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -29,6 +29,10 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
val listener = new TestJavaStreamingListener()
val listenerWrapper = new JavaStreamingListenerWrapper(listener)
+ val streamingStarted = StreamingListenerStreamingStarted(1000L)
+ listenerWrapper.onStreamingStarted(streamingStarted)
+ assert(listener.streamingStarted.time === streamingStarted.time)
+
val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
streamId = 2,
name = "test",
@@ -249,6 +253,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
class TestJavaStreamingListener extends JavaStreamingListener {
+ var streamingStarted: JavaStreamingListenerStreamingStarted = null
var receiverStarted: JavaStreamingListenerReceiverStarted = null
var receiverError: JavaStreamingListenerReceiverError = null
var receiverStopped: JavaStreamingListenerReceiverStopped = null
@@ -258,6 +263,10 @@ class TestJavaStreamingListener extends JavaStreamingListener {
var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
+ override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+ this.streamingStarted = streamingStarted
+ }
+
override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
this.receiverStarted = receiverStarted
}