aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-05 12:52:16 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-05 12:52:16 -0700
commit489700c809a7c0a836538f3d0bd58bed609e8768 (patch)
treea9801de78ba1d6ecb8d120bd3231c999912620ad /streaming
parent47728db7cfac995d9417cdf0e16d07391aabd581 (diff)
downloadspark-489700c809a7c0a836538f3d0bd58bed609e8768.tar.gz
spark-489700c809a7c0a836538f3d0bd58bed609e8768.tar.bz2
spark-489700c809a7c0a836538f3d0bd58bed609e8768.zip
[SPARK-6939] [STREAMING] [WEBUI] Add timeline and histogram graphs for streaming statistics
This is the initial work of SPARK-6939. Not yet ready for code review. Here are the screenshots: ![graph1](https://cloud.githubusercontent.com/assets/1000778/7165766/465942e0-e3dc-11e4-9b05-c184b09d75dc.png) ![graph2](https://cloud.githubusercontent.com/assets/1000778/7165779/53f13f34-e3dc-11e4-8714-a4a75b7e09ff.png) TODOs: - [x] Display more information on mouse hover - [x] Align the timeline and distribution graphs - [x] Clean up the codes Author: zsxwing <zsxwing@gmail.com> Closes #5533 from zsxwing/SPARK-6939 and squashes the following commits: 9f7cd19 [zsxwing] Merge branch 'master' into SPARK-6939 deacc3f [zsxwing] Remove unused import cd03424 [zsxwing] Fix .rat-excludes 70cc87d [zsxwing] Streaming Scheduling Delay => Scheduling Delay d457277 [zsxwing] Fix UIUtils in BatchPage b3f303e [zsxwing] Add comments for unclear classes and methods ff0bff8 [zsxwing] Make InputDStream.name private[streaming] cc392c5 [zsxwing] Merge branch 'master' into SPARK-6939 e275e23 [zsxwing] Move time related methods to Streaming's UIUtils d5d86f6 [zsxwing] Fix incorrect lastErrorTime 3be4b7a [zsxwing] Use InputInfo b50fa32 [zsxwing] Jump to the batch page when clicking a point in the timeline graphs 203605d [zsxwing] Merge branch 'master' into SPARK-6939 74307cf [zsxwing] Reuse the data for histogram graphs to reduce the page size 2586916 [zsxwing] Merge branch 'master' into SPARK-6939 70d8533 [zsxwing] Remove BatchInfo.numRecords and a few renames 7bbdc0a [zsxwing] Hide the receiver sub table if no receiver a2972e9 [zsxwing] Add some ui tests for StreamingPage fd03ad0 [zsxwing] Add a test to verify no memory leak 4a8f886 [zsxwing] Merge branch 'master' into SPARK-6939 18607a1 [zsxwing] Merge branch 'master' into SPARK-6939 d0b0aec [zsxwing] Clean up the codes a459f49 [zsxwing] Add a dash line to processing time graphs 8e4363c [zsxwing] Prepare for the demo c81a1ee [zsxwing] Change time unit in the graphs automatically 4c0b43f [zsxwing] Update Streaming UI 04c7500 [zsxwing] Make the server and client use the same timezone fed8219 [zsxwing] Move the x axis at the top and show a better tooltip c23ce10 [zsxwing] Make two graphs close d78672a [zsxwing] Make the X axis use the same range 881c907 [zsxwing] Use histogram for distribution 5688702 [zsxwing] Fix the unit test ddf741a [zsxwing] Fix the unit test ad93295 [zsxwing] Remove unnecessary codes a0458f9 [zsxwing] Clean the codes b82ed1e [zsxwing] Update the graphs as per comments dd653a1 [zsxwing] Add timeline and histogram graphs for streaming statistics
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala22
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala68
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala621
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala74
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala32
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala52
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala67
13 files changed, 762 insertions, 213 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 175140481e..9c7f698840 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -110,6 +110,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
.toArray
}
+ def getInputStreamName(streamId: Int): Option[String] = synchronized {
+ inputStreams.find(_.id == streamId).map(_.name)
+ }
+
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index e4ad4b509d..9716adb628 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -45,6 +45,11 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
val id = ssc.getNewInputStreamId()
/**
+ * The name of this InputDStream. By default, it's the class name with its id.
+ */
+ private[streaming] def name: String = s"${getClass.getSimpleName}-$id"
+
+ /**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
* Additionally it also ensures valid times are in strictly increasing order.
* This ensures that InputDStream.compute() is called strictly on increasing
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
index 52f08b9c9d..de85f24dd9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -32,6 +32,7 @@ case class ReceiverInfo(
active: Boolean,
location: String,
lastErrorMessage: String = "",
- lastError: String = ""
+ lastError: String = "",
+ lastErrorTime: Long = -1L
) {
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3c341390ed..f73f7e705e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -155,10 +155,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
private def deregisterReceiver(streamId: Int, message: String, error: String) {
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
- oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, lastError = error)
+ val lastErrorTime =
+ if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
+ oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message,
+ lastError = error, lastErrorTime = lastErrorTime)
case None =>
logWarning("No prior receiver info")
- ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+ val lastErrorTime =
+ if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
+ ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
+ lastError = error, lastErrorTime = lastErrorTime)
}
receiverInfo -= streamId
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
@@ -182,7 +188,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
oldInfo.copy(lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
- ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+ ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
+ lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
}
receiverInfo(streamId) = newReceiverInfo
listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index e219e27785..2960b528d4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
import scala.xml.Node
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
private[ui] abstract class BatchTableBase(tableId: String) {
@@ -32,12 +32,12 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
- val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
+ val formattedBatchTime = SparkUIUtils.formatDate(batch.batchTime.milliseconds)
val eventCount = batch.numRecords
val schedulingDelay = batch.schedulingDelay
- val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
- val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
<td sorttable_customkey={batchTime.toString}>
<a href={s"batch?id=$batchTime"}>
@@ -107,7 +107,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
- val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
baseRow(batch) ++
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 2da9a29e25..3f1cab6906 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -24,7 +24,7 @@ import scala.xml.{NodeSeq, Node}
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.streaming.Time
-import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData
@@ -73,8 +73,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
flatMap(info => info.failureReason).headOption.getOrElse("")
- val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
- val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
+ val formattedDuration = duration.map(d => SparkUIUtils.formatDuration(d)).getOrElse("-")
+ val detailUrl = s"${SparkUIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
// In the first row, output op id and its information needs to be shown. In other rows, these
// cells will be taken up due to "rowspan".
@@ -110,7 +110,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</td>
<td class="progress-cell">
{
- UIUtils.makeProgressBar(
+ SparkUIUtils.makeProgressBar(
started = sparkJob.numActiveTasks,
completed = sparkJob.numCompletedTasks,
failed = sparkJob.numFailedTasks,
@@ -135,7 +135,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
- UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+ SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
}
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
@@ -212,24 +212,24 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
throw new IllegalArgumentException(s"Missing id parameter")
}
- val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
+ val formattedBatchTime = SparkUIUtils.formatDate(batchTime.milliseconds)
val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
}
val formattedSchedulingDelay =
- batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ batchUIData.schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val formattedProcessingTime =
- batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
- val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
+ batchUIData.processingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+ val formattedTotalDelay = batchUIData.totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val summary: NodeSeq =
<div>
<ul class="unstyled">
<li>
<strong>Batch Duration: </strong>
- {UIUtils.formatDuration(streamingListener.batchDuration)}
+ {SparkUIUtils.formatDuration(streamingListener.batchDuration)}
</li>
<li>
<strong>Input data size: </strong>
@@ -259,6 +259,6 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val content = summary ++ jobTable
- UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
+ SparkUIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index 99e10d2b0b..a5514dfd71 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -26,7 +26,7 @@ private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobI
private[ui] case class BatchUIData(
val batchTime: Time,
- val receiverNumRecords: Map[Int, Long],
+ val streamIdToNumRecords: Map[Int, Long],
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
@@ -58,7 +58,7 @@ private[ui] case class BatchUIData(
/**
* The number of recorders received by the receivers in this batch.
*/
- def numRecords: Long = receiverNumRecords.map(_._2).sum
+ def numRecords: Long = streamIdToNumRecords.values.sum
}
private[ui] object BatchUIData {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 24cbb2bf9d..68e8ce9894 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -29,7 +29,6 @@ import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
-import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -38,7 +37,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
private val runningBatchUIData = new HashMap[Time, BatchUIData]
private val completedBatchUIData = new Queue[BatchUIData]
- private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
@@ -145,7 +144,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
- def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
+ def numReceivers: Int = synchronized {
+ receiverInfos.size
+ }
def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
@@ -175,39 +176,42 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedBatchUIData.toSeq
}
- def processingDelayDistribution: Option[Distribution] = synchronized {
- extractDistribution(_.processingDelay)
+ def streamName(streamId: Int): Option[String] = {
+ ssc.graph.getInputStreamName(streamId)
}
- def schedulingDelayDistribution: Option[Distribution] = synchronized {
- extractDistribution(_.schedulingDelay)
- }
+ /**
+ * Return all InputDStream Ids
+ */
+ def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
- def totalDelayDistribution: Option[Distribution] = synchronized {
- extractDistribution(_.totalDelay)
- }
-
- def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
- val latestBatchInfos = retainedBatches.reverse.take(batchUIDataLimit)
- val latestReceiverNumRecords = latestBatchInfos.map(_.receiverNumRecords)
- val streamIds = ssc.graph.getInputStreams().map(_.id)
- streamIds.map { id =>
- val recordsOfParticularReceiver =
- latestReceiverNumRecords.map(v => v.getOrElse(id, 0L).toDouble * 1000 / batchDuration)
- val distribution = Distribution(recordsOfParticularReceiver)
- (id, distribution)
+ /**
+ * Return all of the event rates for each InputDStream in each batch. The key of the return value
+ * is the stream id, and the value is a sequence of batch time with its event rate.
+ */
+ def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
+ val _retainedBatches = retainedBatches
+ val latestBatches = _retainedBatches.map { batchUIData =>
+ (batchUIData.batchTime.milliseconds, batchUIData.streamIdToNumRecords)
+ }
+ streamIds.map { streamId =>
+ val eventRates = latestBatches.map {
+ case (batchTime, streamIdToNumRecords) =>
+ val numRecords = streamIdToNumRecords.getOrElse(streamId, 0L)
+ (batchTime, numRecords * 1000.0 / batchDuration)
+ }
+ (streamId, eventRates)
}.toMap
}
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
- val lastReceiverNumRecords = lastReceivedBatch.map(_.receiverNumRecords)
- val streamIds = ssc.graph.getInputStreams().map(_.id)
- lastReceiverNumRecords.map { receiverNumRecords =>
- streamIds.map { id =>
- (id, receiverNumRecords.getOrElse(id, 0L))
+ val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.streamIdToNumRecords)
+ lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
+ streamIds.map { streamId =>
+ (streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))
}.toMap
}.getOrElse {
- streamIds.map(id => (id, 0L)).toMap
+ streamIds.map(streamId => (streamId, 0L)).toMap
}
}
@@ -215,10 +219,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
receiverInfos.get(receiverId)
}
- def receiverIds(): Iterable[Int] = synchronized {
- receiverInfos.keys
- }
-
def lastCompletedBatch: Option[BatchUIData] = synchronized {
completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption
}
@@ -227,15 +227,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
retainedBatches.lastOption
}
- private def retainedBatches: Seq[BatchUIData] = {
+ def retainedBatches: Seq[BatchUIData] = synchronized {
(waitingBatchUIData.values.toSeq ++
runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering)
}
- private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = {
- Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble))
- }
-
def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
val batchUIData = waitingBatchUIData.get(batchTime).orElse {
runningBatchUIData.get(batchTime).orElse {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index db37ae815b..ecbebe5c6c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -17,181 +17,454 @@
package org.apache.spark.streaming.ui
-import java.util.Calendar
+import java.text.SimpleDateFormat
+import java.util.Date
+import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
-import scala.xml.Node
+import scala.collection.mutable.ArrayBuffer
+import scala.xml.{Node, Unparsed}
import org.apache.spark.Logging
import org.apache.spark.ui._
-import org.apache.spark.ui.UIUtils._
-import org.apache.spark.util.Distribution
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
+
+/**
+ * A helper class to generate JavaScript and HTML for both timeline and histogram graphs.
+ *
+ * @param timelineDivId the timeline `id` used in the html `div` tag
+ * @param histogramDivId the timeline `id` used in the html `div` tag
+ * @param data the data for the graph
+ * @param minX the min value of X axis
+ * @param maxX the max value of X axis
+ * @param minY the min value of Y axis
+ * @param maxY the max value of Y axis
+ * @param unitY the unit of Y axis
+ * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in
+ * the graph
+ */
+private[ui] class GraphUIData(
+ timelineDivId: String,
+ histogramDivId: String,
+ data: Seq[(Long, Double)],
+ minX: Long,
+ maxX: Long,
+ minY: Double,
+ maxY: Double,
+ unitY: String,
+ batchInterval: Option[Double] = None) {
+
+ private var dataJavaScriptName: String = _
+
+ def generateDataJs(jsCollector: JsCollector): Unit = {
+ val jsForData = data.map { case (x, y) =>
+ s"""{"x": $x, "y": $y}"""
+ }.mkString("[", ",", "]")
+ dataJavaScriptName = jsCollector.nextVariableName
+ jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
+ }
+
+ def generateTimelineHtml(jsCollector: JsCollector): Seq[Node] = {
+ jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);")
+ if (batchInterval.isDefined) {
+ jsCollector.addStatement(
+ "drawTimeline(" +
+ s"'#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY, '$unitY'," +
+ s" ${batchInterval.get}" +
+ ");")
+ } else {
+ jsCollector.addStatement(
+ s"drawTimeline('#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY," +
+ s" '$unitY');")
+ }
+ <div id={timelineDivId}></div>
+ }
+
+ def generateHistogramHtml(jsCollector: JsCollector): Seq[Node] = {
+ val histogramData = s"$dataJavaScriptName.map(function(d) { return d.y; })"
+ jsCollector.addPreparedStatement(s"registerHistogram($histogramData, $minY, $maxY);")
+ if (batchInterval.isDefined) {
+ jsCollector.addStatement(
+ "drawHistogram(" +
+ s"'#$histogramDivId', $histogramData, $minY, $maxY, '$unitY', ${batchInterval.get}" +
+ ");")
+ } else {
+ jsCollector.addStatement(
+ s"drawHistogram('#$histogramDivId', $histogramData, $minY, $maxY, '$unitY');")
+ }
+ <div id={histogramDivId}></div>
+ }
+}
+
+/**
+ * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
+ * will be used in the timeline and histogram graphs.
+ *
+ * @param data (batchTime, milliseconds). "milliseconds" is something like "processing time".
+ */
+private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
+
+ /**
+ * Converting the original data as per `unit`.
+ */
+ def timelineData(unit: TimeUnit): Seq[(Long, Double)] =
+ data.map(x => x._1 -> UIUtils.convertToTimeUnit(x._2, unit))
+
+ /**
+ * Converting the original data as per `unit`.
+ */
+ def histogramData(unit: TimeUnit): Seq[Double] =
+ data.map(x => UIUtils.convertToTimeUnit(x._2, unit))
+
+ val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
+
+ val formattedAvg: String = StreamingPage.formatDurationOption(avg)
+
+ val max: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).max)
+}
+
+/**
+ * A helper class for "input rate" to generate data that will be used in the timeline and histogram
+ * graphs.
+ *
+ * @param data (batchTime, event-rate).
+ */
+private[ui] class EventRateUIData(val data: Seq[(Long, Double)]) {
+
+ val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
+
+ val formattedAvg: String = avg.map(_.formatted("%.2f")).getOrElse("-")
+
+ val max: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).max)
+}
/** Page for Spark Web UI that shows statistics of a streaming job */
private[ui] class StreamingPage(parent: StreamingTab)
extends WebUIPage("") with Logging {
+ import StreamingPage._
+
private val listener = parent.listener
private val startTime = System.currentTimeMillis()
- private val emptyCell = "-"
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
- val content = listener.synchronized {
- generateBasicStats() ++ <br></br> ++
- <h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
- generateReceiverStats() ++
- generateBatchStatsTable() ++
- generateBatchListTables()
- }
- UIUtils.headerSparkPage("Streaming", content, parent, Some(5000))
+ val resources = generateLoadResources()
+ val basicInfo = generateBasicInfo()
+ val content = resources ++
+ basicInfo ++
+ listener.synchronized {
+ generateStatTable() ++
+ generateBatchListTables()
+ }
+ SparkUIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000))
}
- /** Generate basic stats of the streaming program */
- private def generateBasicStats(): Seq[Node] = {
- val timeSinceStart = System.currentTimeMillis() - startTime
+ /**
+ * Generate html that will load css/js files for StreamingPage
+ */
+ private def generateLoadResources(): Seq[Node] = {
// scalastyle:off
- <ul class ="unstyled">
- <li>
- <strong>Started at: </strong> {UIUtils.formatDate(startTime)}
- </li>
- <li>
- <strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)}
- </li>
- <li>
- <strong>Network receivers: </strong>{listener.numReceivers}
- </li>
- <li>
- <strong>Batch interval: </strong>{formatDurationVerbose(listener.batchDuration)}
- </li>
- <li>
- <a href="#completed"><strong>Completed batches: </strong></a>{listener.numTotalCompletedBatches}
- </li>
- <li>
- <a href="#active"><strong>Active batches: </strong></a>{listener.numUnprocessedBatches}
- </li>
- <li>
- <strong>Received events: </strong>{listener.numTotalReceivedRecords}
- </li>
- <li>
- <strong>Processed events: </strong>{listener.numTotalProcessedRecords}
- </li>
- </ul>
+ <script src={SparkUIUtils.prependBaseUri("/static/d3.min.js")}></script>
+ <link rel="stylesheet" href={SparkUIUtils.prependBaseUri("/static/streaming-page.css")} type="text/css"/>
+ <script src={SparkUIUtils.prependBaseUri("/static/streaming-page.js")}></script>
// scalastyle:on
}
- /** Generate stats of data received by the receivers in the streaming program */
- private def generateReceiverStats(): Seq[Node] = {
- val receivedRecordDistributions = listener.receivedRecordsDistributions
- val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
- val table = if (receivedRecordDistributions.size > 0) {
- val headerRow = Seq(
- "Receiver",
- "Status",
- "Location",
- "Events in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]",
- "Minimum rate\n[events/sec]",
- "Median rate\n[events/sec]",
- "Maximum rate\n[events/sec]",
- "Last Error"
- )
- val dataRows = listener.receiverIds().map { receiverId =>
- val receiverInfo = listener.receiverInfo(receiverId)
- val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
- val receiverActive = receiverInfo.map { info =>
- if (info.active) "ACTIVE" else "INACTIVE"
- }.getOrElse(emptyCell)
- val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
- val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId))
- val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
- d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong))
- }.getOrElse {
- Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
- }
- val receiverLastError = listener.receiverInfo(receiverId).map { info =>
- val msg = s"${info.lastErrorMessage} - ${info.lastError}"
- if (msg.size > 100) msg.take(97) + "..." else msg
- }.getOrElse(emptyCell)
- Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++
- receivedRecordStats ++ Seq(receiverLastError)
- }.toSeq
- Some(listingTable(headerRow, dataRows))
- } else {
- None
- }
+ /** Generate basic information of the streaming program */
+ private def generateBasicInfo(): Seq[Node] = {
+ val timeSinceStart = System.currentTimeMillis() - startTime
+ <div>Running batches of
+ <strong>
+ {SparkUIUtils.formatDurationVerbose(listener.batchDuration)}
+ </strong>
+ for
+ <strong>
+ {SparkUIUtils.formatDurationVerbose(timeSinceStart)}
+ </strong>
+ since
+ <strong>
+ {SparkUIUtils.formatDate(startTime)}
+ </strong>
+ </div>
+ <br />
+ }
- val content =
- <h5>Receiver Statistics</h5> ++
- <div>{table.getOrElse("No receivers")}</div>
+ /**
+ * Generate a global "timeFormat" dictionary in the JavaScript to store the time and its formatted
+ * string. Because we cannot specify a timezone in JavaScript, to make sure the server and client
+ * use the same timezone, we use the "timeFormat" dictionary to format all time values used in the
+ * graphs.
+ *
+ * @param times all time values that will be used in the graphs.
+ */
+ private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
+ val dateFormat = new SimpleDateFormat("HH:mm:ss")
+ val js = "var timeFormat = {};\n" + times.map { time =>
+ val formattedTime = dateFormat.format(new Date(time))
+ s"timeFormat[$time] = '$formattedTime';"
+ }.mkString("\n")
- content
+ <script>{Unparsed(js)}</script>
}
- /** Generate stats of batch jobs of the streaming program */
- private def generateBatchStatsTable(): Seq[Node] = {
- val numBatches = listener.retainedCompletedBatches.size
- val lastCompletedBatch = listener.lastCompletedBatch
- val table = if (numBatches > 0) {
- val processingDelayQuantilesRow = {
- Seq(
- "Processing Time",
- formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay))
- ) ++ getQuantiles(listener.processingDelayDistribution)
- }
- val schedulingDelayQuantilesRow = {
- Seq(
- "Scheduling Delay",
- formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay))
- ) ++ getQuantiles(listener.schedulingDelayDistribution)
- }
- val totalDelayQuantilesRow = {
- Seq(
- "Total Delay",
- formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay))
- ) ++ getQuantiles(listener.totalDelayDistribution)
- }
- val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
- "Median", "75th percentile", "Maximum")
- val dataRows: Seq[Seq[String]] = Seq(
- processingDelayQuantilesRow,
- schedulingDelayQuantilesRow,
- totalDelayQuantilesRow
- )
- Some(listingTable(headerRow, dataRows))
- } else {
- None
- }
+ private def generateStatTable(): Seq[Node] = {
+ val batches = listener.retainedBatches
- val content =
- <h5>Batch Processing Statistics</h5> ++
- <div>
- <ul class="unstyled">
- {table.getOrElse("No statistics have been generated yet.")}
- </ul>
- </div>
+ val batchTimes = batches.map(_.batchTime.milliseconds)
+ val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
+ val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
- content
- }
+ val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo =>
+ (batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
+ })
+ val schedulingDelay = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
+ batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _)
+ })
+ val processingTime = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
+ batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _)
+ })
+ val totalDelay = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
+ batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _)
+ })
- /**
- * Returns a human-readable string representing a duration such as "5 second 35 ms"
- */
- private def formatDurationOption(msOption: Option[Long]): String = {
- msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+ // Use the max value of "schedulingDelay", "processingTime", and "totalDelay" to make the
+ // Y axis ranges same.
+ val _maxTime =
+ (for (m1 <- schedulingDelay.max; m2 <- processingTime.max; m3 <- totalDelay.max) yield
+ m1 max m2 max m3).getOrElse(0L)
+ // Should start at 0
+ val minTime = 0L
+ val (maxTime, normalizedUnit) = UIUtils.normalizeDuration(_maxTime)
+ val formattedUnit = UIUtils.shortTimeUnitString(normalizedUnit)
+
+ // Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same.
+ // If it's not an integral number, just use its ceil integral number.
+ val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
+ val minEventRate = 0L
+
+ // JavaScript to show/hide the InputDStreams sub table.
+ val triangleJs =
+ s"""$$('#inputs-table').toggle('collapsed');
+ |var status = false;
+ |if ($$(this).html() == '$BLACK_RIGHT_TRIANGLE_HTML') {
+ |$$(this).html('$BLACK_DOWN_TRIANGLE_HTML');status = true;}
+ |else {$$(this).html('$BLACK_RIGHT_TRIANGLE_HTML');status = false;}
+ |window.history.pushState('',
+ | document.title, window.location.pathname + '?show-streams-detail=' + status);"""
+ .stripMargin.replaceAll("\\n", "") // it must be only one single line
+
+ val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)
+
+ val jsCollector = new JsCollector
+
+ val graphUIDataForEventRateOfAllStreams =
+ new GraphUIData(
+ "all-stream-events-timeline",
+ "all-stream-events-histogram",
+ eventRateForAllStreams.data,
+ minBatchTime,
+ maxBatchTime,
+ minEventRate,
+ maxEventRate,
+ "events/sec")
+ graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector)
+
+ val graphUIDataForSchedulingDelay =
+ new GraphUIData(
+ "scheduling-delay-timeline",
+ "scheduling-delay-histogram",
+ schedulingDelay.timelineData(normalizedUnit),
+ minBatchTime,
+ maxBatchTime,
+ minTime,
+ maxTime,
+ formattedUnit)
+ graphUIDataForSchedulingDelay.generateDataJs(jsCollector)
+
+ val graphUIDataForProcessingTime =
+ new GraphUIData(
+ "processing-time-timeline",
+ "processing-time-histogram",
+ processingTime.timelineData(normalizedUnit),
+ minBatchTime,
+ maxBatchTime,
+ minTime,
+ maxTime,
+ formattedUnit, Some(batchInterval))
+ graphUIDataForProcessingTime.generateDataJs(jsCollector)
+
+ val graphUIDataForTotalDelay =
+ new GraphUIData(
+ "total-delay-timeline",
+ "total-delay-histogram",
+ totalDelay.timelineData(normalizedUnit),
+ minBatchTime,
+ maxBatchTime,
+ minTime,
+ maxTime,
+ formattedUnit)
+ graphUIDataForTotalDelay.generateDataJs(jsCollector)
+
+ // It's false before the user registers the first InputDStream
+ val hasStream = listener.streamIds.nonEmpty
+
+ val numCompletedBatches = listener.retainedCompletedBatches.size
+ val numActiveBatches = batchTimes.length - numCompletedBatches
+ val table =
+ // scalastyle:off
+ <table id="stat-table" class="table table-bordered" style="width: auto">
+ <thead>
+ <tr>
+ <th style="width: 160px;"></th>
+ <th style="width: 492px;">Timelines (Last {batchTimes.length} batches, {numActiveBatches} active, {numCompletedBatches} completed)</th>
+ <th style="width: 300px;">Histograms</th></tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td style="vertical-align: middle;">
+ <div style="width: 160px;">
+ <div>
+ {if (hasStream) {
+ <span id="triangle" onclick={Unparsed(triangleJs)}>{Unparsed(BLACK_RIGHT_TRIANGLE_HTML)}</span>
+ }}
+ <strong>Input Rate</strong>
+ </div>
+ <div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div>
+ </div>
+ </td>
+ <td class="timeline">{graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
+ </tr>
+ {if (hasStream) {
+ <tr id="inputs-table" style="display: none;" >
+ <td colspan="3">
+ {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
+ </td>
+ </tr>
+ }}
+ <tr>
+ <td style="vertical-align: middle;">
+ <div style="width: 160px;">
+ <div><strong>Scheduling Delay</strong></div>
+ <div>Avg: {schedulingDelay.formattedAvg}</div>
+ </div>
+ </td>
+ <td class="timeline">{graphUIDataForSchedulingDelay.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForSchedulingDelay.generateHistogramHtml(jsCollector)}</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">
+ <div style="width: 160px;">
+ <div><strong>Processing Time</strong></div>
+ <div>Avg: {processingTime.formattedAvg}</div>
+ </div>
+ </td>
+ <td class="timeline">{graphUIDataForProcessingTime.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForProcessingTime.generateHistogramHtml(jsCollector)}</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">
+ <div style="width: 160px;">
+ <div><strong>Total Delay</strong></div>
+ <div>Avg: {totalDelay.formattedAvg}</div>
+ </div>
+ </td>
+ <td class="timeline">{graphUIDataForTotalDelay.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForTotalDelay.generateHistogramHtml(jsCollector)}</td>
+ </tr>
+ </tbody>
+ </table>
+ // scalastyle:on
+
+ generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml
}
- /** Get quantiles for any time distribution */
- private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
- timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) }
+ private def generateInputDStreamsTable(
+ jsCollector: JsCollector,
+ minX: Long,
+ maxX: Long,
+ minY: Double,
+ maxY: Double): Seq[Node] = {
+ val content = listener.receivedEventRateWithBatchTime.map { case (streamId, eventRates) =>
+ generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxY)
+ }.foldLeft[Seq[Node]](Nil)(_ ++ _)
+
+ // scalastyle:off
+ <table class="table table-bordered" style="width: auto">
+ <thead>
+ <tr>
+ <th style="width: 151px;"></th>
+ <th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Status</div></th>
+ <th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Location</div></th>
+ <th style="width: 166px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Last Error Time</div></th>
+ <th>Last Error Message</th>
+ </tr>
+ </thead>
+ <tbody>
+ {content}
+ </tbody>
+ </table>
+ // scalastyle:on
}
- /** Generate HTML table from string data */
- private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
- def generateDataRow(data: Seq[String]): Seq[Node] = {
- <tr> {data.map(d => <td>{d}</td>)} </tr>
- }
- UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+ private def generateInputDStreamRow(
+ jsCollector: JsCollector,
+ streamId: Int,
+ eventRates: Seq[(Long, Double)],
+ minX: Long,
+ maxX: Long,
+ minY: Double,
+ maxY: Double): Seq[Node] = {
+ // If this is a ReceiverInputDStream, we need to show the receiver info. Or we only need the
+ // InputDStream name.
+ val receiverInfo = listener.receiverInfo(streamId)
+ val receiverName = receiverInfo.map(_.name).
+ orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId")
+ val receiverActive = receiverInfo.map { info =>
+ if (info.active) "ACTIVE" else "INACTIVE"
+ }.getOrElse(emptyCell)
+ val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
+ val receiverLastError = receiverInfo.map { info =>
+ val msg = s"${info.lastErrorMessage} - ${info.lastError}"
+ if (msg.size > 100) msg.take(97) + "..." else msg
+ }.getOrElse(emptyCell)
+ val receiverLastErrorTime = receiverInfo.map {
+ r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
+ }.getOrElse(emptyCell)
+ val receivedRecords = new EventRateUIData(eventRates)
+
+ val graphUIDataForEventRate =
+ new GraphUIData(
+ s"stream-$streamId-events-timeline",
+ s"stream-$streamId-events-histogram",
+ receivedRecords.data,
+ minX,
+ maxX,
+ minY,
+ maxY,
+ "events/sec")
+ graphUIDataForEventRate.generateDataJs(jsCollector)
+
+ <tr>
+ <td rowspan="2" style="vertical-align: middle; width: 151px;">
+ <div style="width: 151px;">
+ <div><strong>{receiverName}</strong></div>
+ <div>Avg: {receivedRecords.formattedAvg} events/sec</div>
+ </div>
+ </td>
+ <td>{receiverActive}</td>
+ <td>{receiverLocation}</td>
+ <td>{receiverLastErrorTime}</td>
+ <td><div style="width: 292px;">{receiverLastError}</div></td>
+ </tr>
+ <tr>
+ <td colspan="3" class="timeline">
+ {graphUIDataForEventRate.generateTimelineHtml(jsCollector)}
+ </td>
+ <td class="histogram">{graphUIDataForEventRate.generateHistogramHtml(jsCollector)}</td>
+ </tr>
}
private def generateBatchListTables(): Seq[Node] = {
@@ -216,3 +489,67 @@ private[ui] class StreamingPage(parent: StreamingTab)
}
}
+private[ui] object StreamingPage {
+ val BLACK_RIGHT_TRIANGLE_HTML = "&#9654;"
+ val BLACK_DOWN_TRIANGLE_HTML = "&#9660;"
+
+ val emptyCell = "-"
+
+ /**
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
+ */
+ def formatDurationOption(msOption: Option[Long]): String = {
+ msOption.map(SparkUIUtils.formatDurationVerbose).getOrElse(emptyCell)
+ }
+
+}
+
+/**
+ * A helper class that allows the user to add JavaScript statements which will be executed when the
+ * DOM has finished loading.
+ */
+private[ui] class JsCollector {
+
+ private var variableId = 0
+
+ /**
+ * Return the next unused JavaScript variable name
+ */
+ def nextVariableName: String = {
+ variableId += 1
+ "v" + variableId
+ }
+
+ /**
+ * JavaScript statements that will execute before `statements`
+ */
+ private val preparedStatements = ArrayBuffer[String]()
+
+ /**
+ * JavaScript statements that will execute after `preparedStatements`
+ */
+ private val statements = ArrayBuffer[String]()
+
+ def addPreparedStatement(js: String): Unit = {
+ preparedStatements += js
+ }
+
+ def addStatement(js: String): Unit = {
+ statements += js
+ }
+
+ /**
+ * Generate a html snippet that will execute all scripts when the DOM has finished loading.
+ */
+ def toHtml: Seq[Node] = {
+ val js =
+ s"""
+ |$$(document).ready(function(){
+ | ${preparedStatements.mkString("\n")}
+ | ${statements.mkString("\n")}
+ |});""".stripMargin
+
+ <script>{Unparsed(js)}</script>
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
new file mode 100644
index 0000000000..c206f973b2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import java.util.concurrent.TimeUnit
+
+object UIUtils {
+
+ /**
+ * Return the short string for a `TimeUnit`.
+ */
+ def shortTimeUnitString(unit: TimeUnit): String = unit match {
+ case TimeUnit.NANOSECONDS => "ns"
+ case TimeUnit.MICROSECONDS => "us"
+ case TimeUnit.MILLISECONDS => "ms"
+ case TimeUnit.SECONDS => "sec"
+ case TimeUnit.MINUTES => "min"
+ case TimeUnit.HOURS => "hrs"
+ case TimeUnit.DAYS => "days"
+ }
+
+ /**
+ * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
+ * after converting, also with its TimeUnit.
+ */
+ def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
+ if (milliseconds < 1000) {
+ return (milliseconds, TimeUnit.MILLISECONDS)
+ }
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return (seconds, TimeUnit.SECONDS)
+ }
+ val minutes = seconds / 60
+ if (minutes < 60) {
+ return (minutes, TimeUnit.MINUTES)
+ }
+ val hours = minutes / 60
+ if (hours < 24) {
+ return (hours, TimeUnit.HOURS)
+ }
+ val days = hours / 24
+ (days, TimeUnit.DAYS)
+ }
+
+ /**
+ * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
+ * will discard the fractional part.
+ */
+ def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
+ case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
+ case TimeUnit.MICROSECONDS => milliseconds * 1000
+ case TimeUnit.MILLISECONDS => milliseconds
+ case TimeUnit.SECONDS => milliseconds / 1000.0
+ case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
+ case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
+ case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 8de43baabc..2211f62383 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -94,19 +94,34 @@ class UISeleniumSuite
eventually(timeout(10 seconds), interval(50 milliseconds)) {
// check whether streaming page exists
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
- val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
- statisticText should contain("Network receivers:")
- statisticText should contain("Batch interval:")
-
+ val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
+ h3Text should contain("Streaming Statistics")
+
+ // Check stat table
+ val statTableHeaders = findAll(cssSelector("#stat-table th")).map(_.text).toSeq
+ statTableHeaders.exists(
+ _.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)")) should be
+ (true)
+ statTableHeaders should contain ("Histograms")
+
+ val statTableCells = findAll(cssSelector("#stat-table td")).map(_.text).toSeq
+ statTableCells.exists(_.contains("Input Rate")) should be (true)
+ statTableCells.exists(_.contains("Scheduling Delay")) should be (true)
+ statTableCells.exists(_.contains("Processing Time")) should be (true)
+ statTableCells.exists(_.contains("Total Delay")) should be (true)
+
+ // Check batch tables
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
- List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status")
+ List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time",
+ "Status")
}
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
- List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay")
+ List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time",
+ "Total Delay")
}
val batchLinks =
@@ -176,9 +191,8 @@ class UISeleniumSuite
eventually(timeout(10 seconds), interval(50 milliseconds)) {
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
- val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
- statisticText should not contain ("Network receivers:")
- statisticText should not contain ("Batch interval:")
+ val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
+ h3Text should not contain("Streaming Statistics")
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index e874536e63..2a0f45830e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -94,7 +94,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
- batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L))
+ batchUIData.get.streamIdToNumRecords should be (Map(0 -> 300L, 1 -> 300L))
batchUIData.get.numRecords should be(600)
batchUIData.get.outputOpIdSparkJobIdPairs should be
Seq(OutputOpIdAndSparkJobId(0, 0),
@@ -138,7 +138,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
test("Remove the old completed batches when exceeding the limit") {
val ssc = setupStreams(input, operation)
- val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
val listener = new StreamingJobProgressListener(ssc)
val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
@@ -155,7 +155,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
test("out-of-order onJobStart and onBatchXXX") {
val ssc = setupStreams(input, operation)
- val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
val listener = new StreamingJobProgressListener(ssc)
// fulfill completedBatchInfos
@@ -182,7 +182,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
- batchUIData.get.receiverNumRecords should be (Map.empty)
+ batchUIData.get.streamIdToNumRecords should be (Map.empty)
batchUIData.get.numRecords should be (0)
batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
@@ -203,4 +203,48 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
(listener.waitingBatches.size + listener.runningBatches.size +
listener.retainedCompletedBatches.size + 10)
}
+
+ test("detect memory leak") {
+ val ssc = setupStreams(input, operation)
+ val listener = new StreamingJobProgressListener(ssc)
+
+ val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
+
+ for (_ <- 0 until 2 * limit) {
+ val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
+
+ // onBatchSubmitted
+ val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
+ listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
+
+ // onBatchStarted
+ val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
+
+ // onJobStart
+ val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0)
+ listener.onJobStart(jobStart1)
+
+ val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1)
+ listener.onJobStart(jobStart2)
+
+ val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0)
+ listener.onJobStart(jobStart3)
+
+ val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1)
+ listener.onJobStart(jobStart4)
+
+ // onBatchCompleted
+ val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+ }
+
+ listener.waitingBatches.size should be (0)
+ listener.runningBatches.size should be (0)
+ listener.retainedCompletedBatches.size should be (limit)
+ listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <=
+ (listener.waitingBatches.size + listener.runningBatches.size +
+ listener.retainedCompletedBatches.size + 10)
+ }
+
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
new file mode 100644
index 0000000000..6df1a63ab2
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import java.util.concurrent.TimeUnit
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+
+class UIUtilsSuite extends FunSuite with Matchers{
+
+ test("shortTimeUnitString") {
+ assert("ns" === UIUtils.shortTimeUnitString(TimeUnit.NANOSECONDS))
+ assert("us" === UIUtils.shortTimeUnitString(TimeUnit.MICROSECONDS))
+ assert("ms" === UIUtils.shortTimeUnitString(TimeUnit.MILLISECONDS))
+ assert("sec" === UIUtils.shortTimeUnitString(TimeUnit.SECONDS))
+ assert("min" === UIUtils.shortTimeUnitString(TimeUnit.MINUTES))
+ assert("hrs" === UIUtils.shortTimeUnitString(TimeUnit.HOURS))
+ assert("days" === UIUtils.shortTimeUnitString(TimeUnit.DAYS))
+ }
+
+ test("normalizeDuration") {
+ verifyNormalizedTime(900, TimeUnit.MILLISECONDS, 900)
+ verifyNormalizedTime(1.0, TimeUnit.SECONDS, 1000)
+ verifyNormalizedTime(1.0, TimeUnit.MINUTES, 60 * 1000)
+ verifyNormalizedTime(1.0, TimeUnit.HOURS, 60 * 60 * 1000)
+ verifyNormalizedTime(1.0, TimeUnit.DAYS, 24 * 60 * 60 * 1000)
+ }
+
+ private def verifyNormalizedTime(
+ expectedTime: Double, expectedUnit: TimeUnit, input: Long): Unit = {
+ val (time, unit) = UIUtils.normalizeDuration(input)
+ time should be (expectedTime +- 1E-6)
+ unit should be (expectedUnit)
+ }
+
+ test("convertToTimeUnit") {
+ verifyConvertToTimeUnit(60.0 * 1000 * 1000 * 1000, 60 * 1000, TimeUnit.NANOSECONDS)
+ verifyConvertToTimeUnit(60.0 * 1000 * 1000, 60 * 1000, TimeUnit.MICROSECONDS)
+ verifyConvertToTimeUnit(60 * 1000, 60 * 1000, TimeUnit.MILLISECONDS)
+ verifyConvertToTimeUnit(60, 60 * 1000, TimeUnit.SECONDS)
+ verifyConvertToTimeUnit(1, 60 * 1000, TimeUnit.MINUTES)
+ verifyConvertToTimeUnit(1.0 / 60, 60 * 1000, TimeUnit.HOURS)
+ verifyConvertToTimeUnit(1.0 / 60 / 24, 60 * 1000, TimeUnit.DAYS)
+ }
+
+ private def verifyConvertToTimeUnit(
+ expectedTime: Double, milliseconds: Long, unit: TimeUnit): Unit = {
+ val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit)
+ convertedTime should be (expectedTime +- 1E-6)
+ }
+}