From 6502944f39893b9dfb472f8406d5f3a02a316eff Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 9 Nov 2015 18:13:37 -0800 Subject: [SPARK-11333][STREAMING] Add executorId to ReceiverInfo and display it in UI Expose executorId to `ReceiverInfo` and UI since it's helpful when there are multiple executors running in the same host. Screenshot: screen shot 2015-11-02 at 10 52 19 am Author: Shixiong Zhu Author: zsxwing Closes #9418 from zsxwing/SPARK-11333. --- .../org/apache/spark/streaming/JavaStreamingListenerAPISuite.java | 3 +++ .../streaming/api/java/JavaStreamingListenerWrapperSuite.scala | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) (limited to 'streaming/src/test/java/org/apache') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java index 8cc285aa7f..67b2a0703e 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java @@ -29,6 +29,7 @@ public class JavaStreamingListenerAPISuite extends JavaStreamingListener { receiverInfo.name(); receiverInfo.active(); receiverInfo.location(); + receiverInfo.executorId(); receiverInfo.lastErrorMessage(); receiverInfo.lastError(); receiverInfo.lastErrorTime(); @@ -41,6 +42,7 @@ public class JavaStreamingListenerAPISuite extends JavaStreamingListener { receiverInfo.name(); receiverInfo.active(); receiverInfo.location(); + receiverInfo.executorId(); receiverInfo.lastErrorMessage(); receiverInfo.lastError(); receiverInfo.lastErrorTime(); @@ -53,6 +55,7 @@ public class JavaStreamingListenerAPISuite extends JavaStreamingListener { receiverInfo.name(); receiverInfo.active(); receiverInfo.location(); + receiverInfo.executorId(); receiverInfo.lastErrorMessage(); receiverInfo.lastError(); receiverInfo.lastErrorTime(); diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala index 6d6d61e70c..0295e059f7 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala @@ -33,7 +33,8 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { streamId = 2, name = "test", active = true, - location = "localhost" + location = "localhost", + executorId = "1" )) listenerWrapper.onReceiverStarted(receiverStarted) assertReceiverInfo(listener.receiverStarted.receiverInfo, receiverStarted.receiverInfo) @@ -42,7 +43,8 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { streamId = 2, name = "test", active = false, - location = "localhost" + location = "localhost", + executorId = "1" )) listenerWrapper.onReceiverStopped(receiverStopped) assertReceiverInfo(listener.receiverStopped.receiverInfo, receiverStopped.receiverInfo) @@ -52,6 +54,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { name = "test", active = false, location = "localhost", + executorId = "1", lastErrorMessage = "failed", lastError = "failed", lastErrorTime = System.currentTimeMillis() @@ -197,6 +200,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { assert(javaReceiverInfo.name === receiverInfo.name) assert(javaReceiverInfo.active === receiverInfo.active) assert(javaReceiverInfo.location === receiverInfo.location) + assert(javaReceiverInfo.executorId === receiverInfo.executorId) assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage) assert(javaReceiverInfo.lastError === receiverInfo.lastError) assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime) -- cgit v1.2.3