aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-11-09 18:13:37 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-09 18:13:37 -0800
commit6502944f39893b9dfb472f8406d5f3a02a316eff (patch)
treea6b2e65fd5855e48c52783d5fe44fe49b07ed8fb /streaming
parent1f0f14efe35f986e338ee2cbc1ef2a9ce7395c00 (diff)
downloadspark-6502944f39893b9dfb472f8406d5f3a02a316eff.tar.gz
spark-6502944f39893b9dfb472f8406d5f3a02a316eff.tar.bz2
spark-6502944f39893b9dfb472f8406d5f3a02a316eff.zip
[SPARK-11333][STREAMING] Add executorId to ReceiverInfo and display it in UI
Expose executorId to `ReceiverInfo` and UI since it's helpful when there are multiple executors running in the same host. Screenshot: <img width="1058" alt="screen shot 2015-11-02 at 10 52 19 am" src="https://cloud.githubusercontent.com/assets/1000778/10890968/2e2f5512-8150-11e5-8d9d-746e826b69e8.png"> Author: Shixiong Zhu <shixiong@databricks.com> Author: zsxwing <zsxwing@gmail.com> Closes #9418 from zsxwing/SPARK-11333.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala8
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java3
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala6
8 files changed, 22 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index c86c7101ff..34429074fe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -140,6 +140,7 @@ private[streaming] case class JavaReceiverInfo(
name: String,
active: Boolean,
location: String,
+ executorId: String,
lastErrorMessage: String,
lastError: String,
lastErrorTime: Long)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
index 2c60b396a6..b109b9f1cb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
@@ -33,6 +33,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav
receiverInfo.name,
receiverInfo.active,
receiverInfo.location,
+ receiverInfo.executorId,
receiverInfo.lastErrorMessage,
receiverInfo.lastError,
receiverInfo.lastErrorTime
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
index 59df892397..3b35964114 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -30,6 +30,7 @@ case class ReceiverInfo(
name: String,
active: Boolean,
location: String,
+ executorId: String,
lastErrorMessage: String = "",
lastError: String = "",
lastErrorTime: Long = -1L
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
index ab0a84f052..4dc5bb9c3b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
@@ -49,6 +49,7 @@ private[streaming] case class ReceiverTrackingInfo(
name.getOrElse(""),
state == ReceiverState.ACTIVE,
location = runningExecutor.map(_.host).getOrElse(""),
+ executorId = runningExecutor.map(_.executorId).getOrElse(""),
lastErrorMessage = errorInfo.map(_.lastErrorMessage).getOrElse(""),
lastError = errorInfo.map(_.lastError).getOrElse(""),
lastErrorTime = errorInfo.map(_.lastErrorTime).getOrElse(-1L)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 96d943e75d..4588b2163c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -402,7 +402,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
<tr>
<th style="width: 151px;"></th>
<th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Status</div></th>
- <th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Location</div></th>
+ <th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Executor ID / Host</div></th>
<th style="width: 166px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Last Error Time</div></th>
<th>Last Error Message</th>
</tr>
@@ -430,7 +430,11 @@ private[ui] class StreamingPage(parent: StreamingTab)
val receiverActive = receiverInfo.map { info =>
if (info.active) "ACTIVE" else "INACTIVE"
}.getOrElse(emptyCell)
- val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
+ val receiverLocation = receiverInfo.map { info =>
+ val executorId = if (info.executorId.isEmpty) emptyCell else info.executorId
+ val location = if (info.location.isEmpty) emptyCell else info.location
+ s"$executorId / $location"
+ }.getOrElse(emptyCell)
val receiverLastError = receiverInfo.map { info =>
val msg = s"${info.lastErrorMessage} - ${info.lastError}"
if (msg.size > 100) msg.take(97) + "..." else msg
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
index 8cc285aa7f..67b2a0703e 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
@@ -29,6 +29,7 @@ public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
receiverInfo.name();
receiverInfo.active();
receiverInfo.location();
+ receiverInfo.executorId();
receiverInfo.lastErrorMessage();
receiverInfo.lastError();
receiverInfo.lastErrorTime();
@@ -41,6 +42,7 @@ public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
receiverInfo.name();
receiverInfo.active();
receiverInfo.location();
+ receiverInfo.executorId();
receiverInfo.lastErrorMessage();
receiverInfo.lastError();
receiverInfo.lastErrorTime();
@@ -53,6 +55,7 @@ public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
receiverInfo.name();
receiverInfo.active();
receiverInfo.location();
+ receiverInfo.executorId();
receiverInfo.lastErrorMessage();
receiverInfo.lastError();
receiverInfo.lastErrorTime();
diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
index 6d6d61e70c..0295e059f7 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -33,7 +33,8 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
streamId = 2,
name = "test",
active = true,
- location = "localhost"
+ location = "localhost",
+ executorId = "1"
))
listenerWrapper.onReceiverStarted(receiverStarted)
assertReceiverInfo(listener.receiverStarted.receiverInfo, receiverStarted.receiverInfo)
@@ -42,7 +43,8 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
streamId = 2,
name = "test",
active = false,
- location = "localhost"
+ location = "localhost",
+ executorId = "1"
))
listenerWrapper.onReceiverStopped(receiverStopped)
assertReceiverInfo(listener.receiverStopped.receiverInfo, receiverStopped.receiverInfo)
@@ -52,6 +54,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
name = "test",
active = false,
location = "localhost",
+ executorId = "1",
lastErrorMessage = "failed",
lastError = "failed",
lastErrorTime = System.currentTimeMillis()
@@ -197,6 +200,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
assert(javaReceiverInfo.name === receiverInfo.name)
assert(javaReceiverInfo.active === receiverInfo.active)
assert(javaReceiverInfo.location === receiverInfo.location)
+ assert(javaReceiverInfo.executorId === receiverInfo.executorId)
assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage)
assert(javaReceiverInfo.lastError === receiverInfo.lastError)
assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index af4718b4eb..34cd743556 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -130,20 +130,20 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalReceivedRecords should be (600)
// onReceiverStarted
- val receiverInfoStarted = ReceiverInfo(0, "test", true, "localhost")
+ val receiverInfoStarted = ReceiverInfo(0, "test", true, "localhost", "0")
listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted))
listener.receiverInfo(0) should be (Some(receiverInfoStarted))
listener.receiverInfo(1) should be (None)
// onReceiverError
- val receiverInfoError = ReceiverInfo(1, "test", true, "localhost")
+ val receiverInfoError = ReceiverInfo(1, "test", true, "localhost", "1")
listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError))
listener.receiverInfo(0) should be (Some(receiverInfoStarted))
listener.receiverInfo(1) should be (Some(receiverInfoError))
listener.receiverInfo(2) should be (None)
// onReceiverStopped
- val receiverInfoStopped = ReceiverInfo(2, "test", true, "localhost")
+ val receiverInfoStopped = ReceiverInfo(2, "test", true, "localhost", "2")
listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped))
listener.receiverInfo(0) should be (Some(receiverInfoStarted))
listener.receiverInfo(1) should be (Some(receiverInfoError))