aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-23 17:41:49 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-23 17:52:46 -0700
commit1c5a828b7b90248739ce1d7754a5394a877c5508 (patch)
tree3c51dffb4c4ef5a561056c8be69f141622c66203
parent595f92fef5fedfeb1d630abb18962136267ae0fe (diff)
downloadspark-1c5a828b7b90248739ce1d7754a5394a877c5508.tar.gz
spark-1c5a828b7b90248739ce1d7754a5394a877c5508.tar.bz2
spark-1c5a828b7b90248739ce1d7754a5394a877c5508.zip
[SPARK-10148] [STREAMING] Display active and inactive receiver numbers in Streaming page
Added the active and inactive receiver numbers in the summary section of Streaming page. <img width="1074" alt="screen shot 2015-08-21 at 2 08 54 pm" src="https://cloud.githubusercontent.com/assets/1000778/9402437/ff2806a2-480f-11e5-8f8e-efdf8e5d514d.png"> Author: zsxwing <zsxwing@gmail.com> Closes #8351 from zsxwing/receiver-number. (cherry picked from commit c6df5f66d9a8b9760f2cd46fcd930f977650c9c5) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala6
2 files changed, 14 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index b77c555c68..78aeb004e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -148,6 +148,14 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
receiverInfos.size
}
+ def numActiveReceivers: Int = synchronized {
+ receiverInfos.count(_._2.active)
+ }
+
+ def numInactiveReceivers: Int = {
+ ssc.graph.getReceiverInputStreams().size - numActiveReceivers
+ }
+
def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 87af902428..96d943e75d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -303,6 +303,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val numCompletedBatches = listener.retainedCompletedBatches.size
val numActiveBatches = batchTimes.length - numCompletedBatches
+ val numReceivers = listener.numInactiveReceivers + listener.numActiveReceivers
val table =
// scalastyle:off
<table id="stat-table" class="table table-bordered" style="width: auto">
@@ -330,6 +331,11 @@ private[ui] class StreamingPage(parent: StreamingTab)
}
}
</div>
+ {
+ if (numReceivers > 0) {
+ <div>Receivers: {listener.numActiveReceivers} / {numReceivers} active</div>
+ }
+ }
<div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div>
</div>
</td>