From 6502944f39893b9dfb472f8406d5f3a02a316eff Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 9 Nov 2015 18:13:37 -0800 Subject: [SPARK-11333][STREAMING] Add executorId to ReceiverInfo and display it in UI Expose executorId to `ReceiverInfo` and UI since it's helpful when there are multiple executors running in the same host. Screenshot: screen shot 2015-11-02 at 10 52 19 am Author: Shixiong Zhu Author: zsxwing Closes #9418 from zsxwing/SPARK-11333. --- .../apache/spark/streaming/api/java/JavaStreamingListener.scala | 1 + .../spark/streaming/api/java/JavaStreamingListenerWrapper.scala | 1 + .../scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala | 1 + .../apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala | 1 + .../main/scala/org/apache/spark/streaming/ui/StreamingPage.scala | 8 ++++++-- .../org/apache/spark/streaming/JavaStreamingListenerAPISuite.java | 3 +++ .../streaming/api/java/JavaStreamingListenerWrapperSuite.scala | 8 ++++++-- .../spark/streaming/ui/StreamingJobProgressListenerSuite.scala | 6 +++--- 8 files changed, 22 insertions(+), 7 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala index c86c7101ff..34429074fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -140,6 +140,7 @@ private[streaming] case class JavaReceiverInfo( name: String, active: Boolean, location: String, + executorId: String, lastErrorMessage: String, lastError: String, lastErrorTime: Long) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala index 2c60b396a6..b109b9f1cb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala @@ -33,6 +33,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav receiverInfo.name, receiverInfo.active, receiverInfo.location, + receiverInfo.executorId, receiverInfo.lastErrorMessage, receiverInfo.lastError, receiverInfo.lastErrorTime diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala index 59df892397..3b35964114 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala @@ -30,6 +30,7 @@ case class ReceiverInfo( name: String, active: Boolean, location: String, + executorId: String, lastErrorMessage: String = "", lastError: String = "", lastErrorTime: Long = -1L diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala index ab0a84f052..4dc5bb9c3b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala @@ -49,6 +49,7 @@ private[streaming] case class ReceiverTrackingInfo( name.getOrElse(""), state == ReceiverState.ACTIVE, location = runningExecutor.map(_.host).getOrElse(""), + executorId = runningExecutor.map(_.executorId).getOrElse(""), lastErrorMessage = errorInfo.map(_.lastErrorMessage).getOrElse(""), lastError = errorInfo.map(_.lastError).getOrElse(""), lastErrorTime = errorInfo.map(_.lastErrorTime).getOrElse(-1L) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 96d943e75d..4588b2163c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -402,7 +402,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
Status
-
Location
+
Executor ID / Host
Last Error Time
Last Error Message @@ -430,7 +430,11 @@ private[ui] class StreamingPage(parent: StreamingTab) val receiverActive = receiverInfo.map { info => if (info.active) "ACTIVE" else "INACTIVE" }.getOrElse(emptyCell) - val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) + val receiverLocation = receiverInfo.map { info => + val executorId = if (info.executorId.isEmpty) emptyCell else info.executorId + val location = if (info.location.isEmpty) emptyCell else info.location + s"$executorId / $location" + }.getOrElse(emptyCell) val receiverLastError = receiverInfo.map { info => val msg = s"${info.lastErrorMessage} - ${info.lastError}" if (msg.size > 100) msg.take(97) + "..." else msg diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java index 8cc285aa7f..67b2a0703e 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java @@ -29,6 +29,7 @@ public class JavaStreamingListenerAPISuite extends JavaStreamingListener { receiverInfo.name(); receiverInfo.active(); receiverInfo.location(); + receiverInfo.executorId(); receiverInfo.lastErrorMessage(); receiverInfo.lastError(); receiverInfo.lastErrorTime(); @@ -41,6 +42,7 @@ public class JavaStreamingListenerAPISuite extends JavaStreamingListener { receiverInfo.name(); receiverInfo.active(); receiverInfo.location(); + receiverInfo.executorId(); receiverInfo.lastErrorMessage(); receiverInfo.lastError(); receiverInfo.lastErrorTime(); @@ -53,6 +55,7 @@ public class JavaStreamingListenerAPISuite extends JavaStreamingListener { receiverInfo.name(); receiverInfo.active(); receiverInfo.location(); + receiverInfo.executorId(); receiverInfo.lastErrorMessage(); receiverInfo.lastError(); receiverInfo.lastErrorTime(); diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala index 6d6d61e70c..0295e059f7 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala @@ -33,7 +33,8 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { streamId = 2, name = "test", active = true, - location = "localhost" + location = "localhost", + executorId = "1" )) listenerWrapper.onReceiverStarted(receiverStarted) assertReceiverInfo(listener.receiverStarted.receiverInfo, receiverStarted.receiverInfo) @@ -42,7 +43,8 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { streamId = 2, name = "test", active = false, - location = "localhost" + location = "localhost", + executorId = "1" )) listenerWrapper.onReceiverStopped(receiverStopped) assertReceiverInfo(listener.receiverStopped.receiverInfo, receiverStopped.receiverInfo) @@ -52,6 +54,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { name = "test", active = false, location = "localhost", + executorId = "1", lastErrorMessage = "failed", lastError = "failed", lastErrorTime = System.currentTimeMillis() @@ -197,6 +200,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { assert(javaReceiverInfo.name === receiverInfo.name) assert(javaReceiverInfo.active === receiverInfo.active) assert(javaReceiverInfo.location === receiverInfo.location) + assert(javaReceiverInfo.executorId === receiverInfo.executorId) assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage) assert(javaReceiverInfo.lastError === receiverInfo.lastError) assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index af4718b4eb..34cd743556 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -130,20 +130,20 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.numTotalReceivedRecords should be (600) // onReceiverStarted - val receiverInfoStarted = ReceiverInfo(0, "test", true, "localhost") + val receiverInfoStarted = ReceiverInfo(0, "test", true, "localhost", "0") listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted)) listener.receiverInfo(0) should be (Some(receiverInfoStarted)) listener.receiverInfo(1) should be (None) // onReceiverError - val receiverInfoError = ReceiverInfo(1, "test", true, "localhost") + val receiverInfoError = ReceiverInfo(1, "test", true, "localhost", "1") listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError)) listener.receiverInfo(0) should be (Some(receiverInfoStarted)) listener.receiverInfo(1) should be (Some(receiverInfoError)) listener.receiverInfo(2) should be (None) // onReceiverStopped - val receiverInfoStopped = ReceiverInfo(2, "test", true, "localhost") + val receiverInfoStopped = ReceiverInfo(2, "test", true, "localhost", "2") listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped)) listener.receiverInfo(0) should be (Some(receiverInfoStarted)) listener.receiverInfo(1) should be (Some(receiverInfoError)) -- cgit v1.2.3