aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-10-16 13:53:06 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-16 13:53:06 -0700
commit369d786f58580e7df73e7e23f27390d37269d0de (patch)
tree58a261eeebdef8a9b69eb7b5d8e362db8aabf04e /streaming
parent3d683a139b333456a6bd8801ac5f113d1ac3fd18 (diff)
downloadspark-369d786f58580e7df73e7e23f27390d37269d0de.tar.gz
spark-369d786f58580e7df73e7e23f27390d37269d0de.tar.bz2
spark-369d786f58580e7df73e7e23f27390d37269d0de.zip
[SPARK-10974] [STREAMING] Add progress bar for output operation column and use red dots for failed batches
Screenshot: <img width="1363" alt="1" src="https://cloud.githubusercontent.com/assets/1000778/10342571/385d9340-6d4c-11e5-8e79-1fa4c3c98f81.png"> Also fixed the description and duration for output operations that don't have spark jobs. <img width="1354" alt="2" src="https://cloud.githubusercontent.com/assets/1000778/10342775/4bd52a0e-6d4d-11e5-99bc-26265a9fc792.png"> Author: zsxwing <zsxwing@gmail.com> Closes #9010 from zsxwing/output-op-progress-bar.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js26
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala23
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala30
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala40
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala174
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala67
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala14
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala30
14 files changed, 258 insertions, 201 deletions
diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
index 4886b68eea..f82323a1cd 100644
--- a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
+++ b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
@@ -154,34 +154,40 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
var lastClickedBatch = null;
var lastTimeout = null;
+ function isFailedBatch(batchTime) {
+ return $("#batch-" + batchTime).attr("isFailed") == "true";
+ }
+
// Add points to the line. However, we make it invisible at first. But when the user moves mouse
// over a point, it will be displayed with its detail.
svg.selectAll(".point")
.data(data)
.enter().append("circle")
- .attr("stroke", "white") // white and opacity = 0 make it invisible
- .attr("fill", "white")
- .attr("opacity", "0")
+ .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) // white and opacity = 0 make it invisible
+ .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "white";})
+ .attr("opacity", function(d) { return isFailedBatch(d.x) ? "1" : "0";})
.style("cursor", "pointer")
.attr("cx", function(d) { return x(d.x); })
.attr("cy", function(d) { return y(d.y); })
- .attr("r", function(d) { return 3; })
+ .attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";})
.on('mouseover', function(d) {
var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x];
showBootstrapTooltip(d3.select(this).node(), tip);
// show the point
d3.select(this)
- .attr("stroke", "steelblue")
- .attr("fill", "steelblue")
- .attr("opacity", "1");
+ .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "steelblue";})
+ .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "steelblue";})
+ .attr("opacity", "1")
+ .attr("r", "3");
})
.on('mouseout', function() {
hideBootstrapTooltip(d3.select(this).node());
// hide the point
d3.select(this)
- .attr("stroke", "white")
- .attr("fill", "white")
- .attr("opacity", "0");
+ .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "white";})
+ .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "white";})
+ .attr("opacity", function(d) { return isFailedBatch(d.x) ? "1" : "0";})
+ .attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";});
})
.on("click", function(d) {
if (lastTimeout != null) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index de79c9ef1a..1b0b7890b3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -113,7 +113,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
- jobOption.foreach(_.setCallSite(outputStream.creationSite.longForm))
+ jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 463f899dc2..436eb0a566 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time
* the streaming scheduler queue
* @param processingStartTime Clock time of when the first job of this batch started processing
* @param processingEndTime Clock time of when the last job of this batch finished processing
+ * @param outputOperationInfos The output operations in this batch
*/
@DeveloperApi
case class BatchInfo(
@@ -36,13 +37,10 @@ case class BatchInfo(
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long],
- processingEndTime: Option[Long]
+ processingEndTime: Option[Long],
+ outputOperationInfos: Map[Int, OutputOperationInfo]
) {
- private var _failureReasons: Map[Int, String] = Map.empty
-
- private var _numOutputOp: Int = 0
-
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
@@ -72,19 +70,4 @@ case class BatchInfo(
*/
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
- /** Set the failure reasons corresponding to every output ops in the batch */
- private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = {
- _failureReasons = reasons
- }
-
- /** Failure reasons corresponding to every output ops in the batch */
- private[streaming] def failureReasons = _failureReasons
-
- /** Set the number of output operations in this batch */
- private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = {
- _numOutputOp = numOutputOp
- }
-
- /** Return the number of output operations in this batch */
- private[streaming] def numOutputOp: Int = _numOutputOp
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 1373053f06..ab1b3565fc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -17,8 +17,10 @@
package org.apache.spark.streaming.scheduler
+import scala.util.{Failure, Try}
+
import org.apache.spark.streaming.Time
-import scala.util.Try
+import org.apache.spark.util.{Utils, CallSite}
/**
* Class representing a Spark computation. It may contain multiple Spark jobs.
@@ -29,7 +31,9 @@ class Job(val time: Time, func: () => _) {
private var _outputOpId: Int = _
private var isSet = false
private var _result: Try[_] = null
- private var _callSite: String = "Unknown"
+ private var _callSite: CallSite = null
+ private var _startTime: Option[Long] = None
+ private var _endTime: Option[Long] = None
def run() {
_result = Try(func())
@@ -71,11 +75,29 @@ class Job(val time: Time, func: () => _) {
_outputOpId = outputOpId
}
- def setCallSite(callSite: String): Unit = {
+ def setCallSite(callSite: CallSite): Unit = {
_callSite = callSite
}
- def callSite: String = _callSite
+ def callSite: CallSite = _callSite
+
+ def setStartTime(startTime: Long): Unit = {
+ _startTime = Some(startTime)
+ }
+
+ def setEndTime(endTime: Long): Unit = {
+ _endTime = Some(endTime)
+ }
+
+ def toOutputOperationInfo: OutputOperationInfo = {
+ val failureReason = if (_result != null && _result.isFailure) {
+ Some(Utils.exceptionString(_result.asInstanceOf[Failure[_]].exception))
+ } else {
+ None
+ }
+ OutputOperationInfo(
+ time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, failureReason)
+ }
override def toString: String = id
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 0a4a396a0f..2480b4ec09 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -20,13 +20,13 @@ package org.apache.spark.streaming.scheduler
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
-import scala.util.{Failure, Success}
+import scala.util.Failure
import org.apache.spark.Logging
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._
import org.apache.spark.streaming.ui.UIUtils
-import org.apache.spark.util.{EventLoop, ThreadUtils}
+import org.apache.spark.util.{EventLoop, ThreadUtils, Utils}
private[scheduler] sealed trait JobSchedulerEvent
@@ -162,16 +162,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// correct "jobSet.processingStartTime".
listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
}
- listenerBus.post(StreamingListenerOutputOperationStarted(
- OutputOperationInfo(job.time, job.outputOpId, job.callSite, Some(startTime), None)))
+ job.setStartTime(startTime)
+ listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
}
private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
- listenerBus.post(StreamingListenerOutputOperationCompleted(
- OutputOperationInfo(job.time, job.outputOpId, job.callSite, None, Some(completedTime))))
+ job.setEndTime(completedTime)
+ listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 08f63cc992..f76300351e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -64,24 +64,13 @@ case class JobSet(
}
def toBatchInfo: BatchInfo = {
- val failureReasons: Map[Int, String] = {
- if (hasCompleted) {
- jobs.filter(_.result.isFailure).map { job =>
- (job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception))
- }.toMap
- } else {
- Map.empty
- }
- }
- val binfo = new BatchInfo(
+ BatchInfo(
time,
streamIdToInputInfo,
submissionTime,
if (processingStartTime >= 0) Some(processingStartTime) else None,
- if (processingEndTime >= 0) Some(processingEndTime) else None
+ if (processingEndTime >= 0) Some(processingEndTime) else None,
+ jobs.map { job => (job.outputOpId, job.toOutputOperationInfo) }.toMap
)
- binfo.setFailureReason(failureReasons)
- binfo.setNumOutputOp(jobs.size)
- binfo
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
index d5614b3439..137e512a67 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
@@ -25,17 +25,21 @@ import org.apache.spark.streaming.Time
* Class having information on output operations.
* @param batchTime Time of the batch
* @param id Id of this output operation. Different output operations have different ids in a batch.
+ * @param name The name of this output operation.
* @param description The description of this output operation.
* @param startTime Clock time of when the output operation started processing
* @param endTime Clock time of when the output operation started processing
+ * @param failureReason Failure reason if this output operation fails
*/
@DeveloperApi
case class OutputOperationInfo(
batchTime: Time,
id: Int,
+ name: String,
description: String,
startTime: Option[Long],
- endTime: Option[Long]) {
+ endTime: Option[Long],
+ failureReason: Option[String]) {
/**
* Return the duration of this output operation.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 3e6590d66f..125cafd41b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -17,9 +17,6 @@
package org.apache.spark.streaming.ui
-import java.text.SimpleDateFormat
-import java.util.Date
-
import scala.xml.Node
import org.apache.spark.ui.{UIUtils => SparkUIUtils}
@@ -46,7 +43,8 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
val batchTimeId = s"batch-$batchTime"
- <td id={batchTimeId} sorttable_customkey={batchTime.toString}>
+ <td id={batchTimeId} sorttable_customkey={batchTime.toString}
+ isFailed={batch.isFailed.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
@@ -75,6 +73,19 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
batchTable
}
+ protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
+ <td class="progress-cell">
+ {
+ SparkUIUtils.makeProgressBar(
+ started = batch.numActiveOutputOp,
+ completed = batch.numCompletedOutputOp,
+ failed = batch.numFailedOutputOp,
+ skipped = 0,
+ total = batch.outputOperations.size)
+ }
+ </td>
+ }
+
/**
* Return HTML for all rows of this table.
*/
@@ -86,7 +97,10 @@ private[ui] class ActiveBatchTable(
waitingBatches: Seq[BatchUIData],
batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
- override protected def columns: Seq[Node] = super.columns ++ <th>Status</th>
+ override protected def columns: Seq[Node] = super.columns ++ {
+ <th>Output Ops: Succeeded/Total</th>
+ <th>Status</th>
+ }
override protected def renderRows: Seq[Node] = {
// The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
@@ -96,11 +110,11 @@ private[ui] class ActiveBatchTable(
}
private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
- baseRow(batch) ++ <td>processing</td>
+ baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td>
}
private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
- baseRow(batch) ++ <td>queued</td>
+ baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>
}
}
@@ -119,17 +133,11 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
- val numFailedOutputOp = batch.failureReason.size
- val outputOpColumn = if (numFailedOutputOp > 0) {
- s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" +
- s" (${numFailedOutputOp} failed)"
- } else {
- s"${batch.numOutputOp}/${batch.numOutputOp}"
- }
- baseRow(batch) ++
+
+ baseRow(batch) ++ {
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
- <td>{outputOpColumn}</td>
+ } ++ createOutputOperationProgressBar(batch)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index a19b85a51d..2ed9255728 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -47,32 +47,30 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
private def generateJobRow(
- outputOpId: OutputOpId,
+ outputOpData: OutputOperationUIData,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
- outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) {
- generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
- outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+ generateNormalJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration,
+ numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
} else {
- generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
- outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+ generateDroppedJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration,
+ numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
}
}
private def generateOutputOpRowWithoutSparkJobs(
- outputOpId: OutputOpId,
+ outputOpData: OutputOperationUIData,
outputOpDescription: Seq[Node],
- formattedOutputOpDuration: String,
- outputOpStatus: String): Seq[Node] = {
+ formattedOutputOpDuration: String): Seq[Node] = {
<tr>
- <td class="output-op-id-cell" >{outputOpId.toString}</td>
+ <td class="output-op-id-cell" >{outputOpData.id.toString}</td>
<td>{outputOpDescription}</td>
<td>{formattedOutputOpDuration}</td>
- {outputOpStatusCell(outputOpStatus, rowspan = 1)}
+ {outputOpStatusCell(outputOpData, rowspan = 1)}
<!-- Job Id -->
<td>-</td>
<!-- Duration -->
@@ -91,10 +89,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
* one cell, we use "rowspan" for the first row of a output op.
*/
private def generateNormalJobRow(
- outputOpId: OutputOpId,
+ outputOpData: OutputOperationUIData,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
- outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
@@ -116,12 +113,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
// scalastyle:off
val prefixCells =
if (isFirstRow) {
- <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+ <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpData.id.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>
{outputOpDescription}
</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
- {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
+ {outputOpStatusCell(outputOpData, numSparkJobRowsInOutputOp)}
} else {
Nil
}
@@ -161,10 +158,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
* with "-" cells.
*/
private def generateDroppedJobRow(
- outputOpId: OutputOpId,
+ outputOpData: OutputOperationUIData,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
- outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
jobId: Int): Seq[Node] = {
@@ -173,10 +169,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
// scalastyle:off
val prefixCells =
if (isFirstRow) {
- <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+ <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpData.id.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
- {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
+ {outputOpStatusCell(outputOpData, numSparkJobRowsInOutputOp)}
} else {
Nil
}
@@ -199,47 +195,34 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
private def generateOutputOpIdRow(
- outputOpId: OutputOpId,
- outputOpStatus: String,
+ outputOpData: OutputOperationUIData,
sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
- // We don't count the durations of dropped jobs
- val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
- map(sparkJob => {
- sparkJob.submissionTime.map { start =>
- val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
- end - start
- }
- })
val formattedOutputOpDuration =
- if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
- // If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
+ if (outputOpData.duration.isEmpty) {
"-"
} else {
- SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
+ SparkUIUtils.formatDuration(outputOpData.duration.get)
}
- val description = generateOutputOpDescription(sparkJobs)
+ val description = generateOutputOpDescription(outputOpData)
if (sparkJobs.isEmpty) {
- generateOutputOpRowWithoutSparkJobs(
- outputOpId, description, formattedOutputOpDuration, outputOpStatus)
+ generateOutputOpRowWithoutSparkJobs(outputOpData, description, formattedOutputOpDuration)
} else {
val firstRow =
generateJobRow(
- outputOpId,
+ outputOpData,
description,
formattedOutputOpDuration,
- outputOpStatus,
sparkJobs.size,
true,
sparkJobs.head)
val tailRows =
sparkJobs.tail.map { sparkJob =>
generateJobRow(
- outputOpId,
+ outputOpData,
description,
formattedOutputOpDuration,
- outputOpStatus,
sparkJobs.size,
false,
sparkJob)
@@ -248,35 +231,18 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
}
- private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
- val lastStageInfo =
- sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
- flatMap { sparkJob => // For the first job, get the latest Stage info
- if (sparkJob.stageIds.isEmpty) {
- None
- } else {
- sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
- }
- }
- lastStageInfo match {
- case Some(stageInfo) =>
- val details = if (stageInfo.details.nonEmpty) {
- <span
- onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
- class="expand-details">
- +details
- </span> ++
- <div class="stage-details collapsed">
- <pre>{stageInfo.details}</pre>
- </div>
- } else {
- NodeSeq.Empty
- }
-
- <div> {stageInfo.name} {details} </div>
- case None =>
- Text("(Unknown)")
- }
+ private def generateOutputOpDescription(outputOp: OutputOperationUIData): Seq[Node] = {
+ <div>
+ {outputOp.name}
+ <span
+ onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
+ class="expand-details">
+ +details
+ </span>
+ <div class="stage-details collapsed">
+ <pre>{outputOp.description}</pre>
+ </div>
+ </div>
}
private def failureReasonCell(
@@ -329,6 +295,19 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
}
+ private def generateOutputOperationStatusForUI(failure: String): String = {
+ if (failure.startsWith("org.apache.spark.SparkException")) {
+ "Failed due to Spark job error\n" + failure
+ } else {
+ var nextLineIndex = failure.indexOf("\n")
+ if (nextLineIndex < 0) {
+ nextLineIndex = failure.size
+ }
+ val firstLine = failure.substring(0, nextLineIndex)
+ s"Failed due to error: $firstLine\n$failure"
+ }
+ }
+
/**
* Generate the job table for the batch.
*/
@@ -338,26 +317,15 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
// sort SparkJobIds for each OutputOpId
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
- val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId =>
- val status = batchUIData.failureReason.get(outputOpId).map { failure =>
- if (failure.startsWith("org.apache.spark.SparkException")) {
- "Failed due to Spark job error\n" + failure
- } else {
- var nextLineIndex = failure.indexOf("\n")
- if (nextLineIndex < 0) {
- nextLineIndex = failure.size
- }
- val firstLine = failure.substring(0, nextLineIndex)
- s"Failed due to error: $firstLine\n$failure"
- }
- }.getOrElse("Succeeded")
- val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
- (outputOpId, status, sparkJobIds)
- }
+
+ val outputOps: Seq[(OutputOperationUIData, Seq[SparkJobId])] =
+ batchUIData.outputOperations.map { case (outputOpId, outputOperation) =>
+ val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
+ (outputOperation, sparkJobIds)
+ }.toSeq.sortBy(_._1.id)
sparkListener.synchronized {
- val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] =
- outputOps.map { case (outputOpId, status, sparkJobIds) =>
- (outputOpId, status,
+ val outputOpWithJobs = outputOps.map { case (outputOpData, sparkJobIds) =>
+ (outputOpData,
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}
@@ -367,9 +335,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</thead>
<tbody>
{
- outputOpIdWithJobs.map {
- case (outputOpId, status, sparkJobIds) =>
- generateOutputOpIdRow(outputOpId, status, sparkJobIds)
+ outputOpWithJobs.map { case (outputOpData, sparkJobIds) =>
+ generateOutputOpIdRow(outputOpData, sparkJobIds)
}
}
</tbody>
@@ -377,7 +344,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
}
- def render(request: HttpServletRequest): Seq[Node] = {
+ def render(request: HttpServletRequest): Seq[Node] = streamingListener.synchronized {
val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
throw new IllegalArgumentException(s"Missing id parameter")
}
@@ -430,14 +397,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</ul>
</div>
- val jobTable =
- if (batchUIData.outputOpIdSparkJobIdPairs.isEmpty) {
- <div>Cannot find any job for Batch {formattedBatchTime}.</div>
- } else {
- generateJobTable(batchUIData)
- }
-
- val content = summary ++ jobTable
+ val content = summary ++ generateJobTable(batchUIData)
SparkUIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
}
@@ -471,11 +431,17 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
}
- private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = {
- if (status == "Succeeded") {
- <td rowspan={rowspan.toString}>Succeeded</td>
- } else {
- failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false)
+ private def outputOpStatusCell(outputOp: OutputOperationUIData, rowspan: Int): Seq[Node] = {
+ outputOp.failureReason match {
+ case Some(failureReason) =>
+ val failureReasonForUI = generateOutputOperationStatusForUI(failureReason)
+ failureReasonCell(failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false)
+ case None =>
+ if (outputOp.endTime.isEmpty) {
+ <td rowspan={rowspan.toString}>-</td>
+ } else {
+ <td rowspan={rowspan.toString}>Succeeded</td>
+ }
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index e6c2e2140c..3ef3689de1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -18,8 +18,10 @@
package org.apache.spark.streaming.ui
+import scala.collection.mutable
+
import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.scheduler.{BatchInfo, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.{BatchInfo, OutputOperationInfo, StreamInputInfo}
import org.apache.spark.streaming.ui.StreamingJobProgressListener._
private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId)
@@ -30,8 +32,7 @@ private[ui] case class BatchUIData(
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
- val numOutputOp: Int,
- val failureReason: Map[Int, String],
+ val outputOperations: mutable.HashMap[OutputOpId, OutputOperationUIData] = mutable.HashMap(),
var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
/**
@@ -61,19 +62,75 @@ private[ui] case class BatchUIData(
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
+
+ /**
+ * Update an output operation information of this batch.
+ */
+ def updateOutputOperationInfo(outputOperationInfo: OutputOperationInfo): Unit = {
+ assert(batchTime == outputOperationInfo.batchTime)
+ outputOperations(outputOperationInfo.id) = OutputOperationUIData(outputOperationInfo)
+ }
+
+ /**
+ * Return the number of failed output operations.
+ */
+ def numFailedOutputOp: Int = outputOperations.values.count(_.failureReason.nonEmpty)
+
+ /**
+ * Return the number of running output operations.
+ */
+ def numActiveOutputOp: Int = outputOperations.values.count(_.endTime.isEmpty)
+
+ /**
+ * Return the number of completed output operations.
+ */
+ def numCompletedOutputOp: Int = outputOperations.values.count {
+ op => op.failureReason.isEmpty && op.endTime.nonEmpty
+ }
+
+ /**
+ * Return if this batch has any output operations
+ */
+ def isFailed: Boolean = numFailedOutputOp != 0
}
private[ui] object BatchUIData {
def apply(batchInfo: BatchInfo): BatchUIData = {
+ val outputOperations = mutable.HashMap[OutputOpId, OutputOperationUIData]()
+ outputOperations ++= batchInfo.outputOperationInfos.mapValues(OutputOperationUIData.apply)
new BatchUIData(
batchInfo.batchTime,
batchInfo.streamIdToInputInfo,
batchInfo.submissionTime,
batchInfo.processingStartTime,
batchInfo.processingEndTime,
- batchInfo.numOutputOp,
- batchInfo.failureReasons
+ outputOperations
+ )
+ }
+}
+
+private[ui] case class OutputOperationUIData(
+ id: OutputOpId,
+ name: String,
+ description: String,
+ startTime: Option[Long],
+ endTime: Option[Long],
+ failureReason: Option[String]) {
+
+ def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
+}
+
+private[ui] object OutputOperationUIData {
+
+ def apply(outputOperationInfo: OutputOperationInfo): OutputOperationUIData = {
+ OutputOperationUIData(
+ outputOperationInfo.id,
+ outputOperationInfo.name,
+ outputOperationInfo.description,
+ outputOperationInfo.startTime,
+ outputOperationInfo.endTime,
+ outputOperationInfo.failureReason
)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 78aeb004e1..f6cc6edf25 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -119,6 +119,20 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
+ override def onOutputOperationStarted(
+ outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = synchronized {
+ // This method is called after onBatchStarted
+ runningBatchUIData(outputOperationStarted.outputOperationInfo.batchTime).
+ updateOutputOperationInfo(outputOperationStarted.outputOperationInfo)
+ }
+
+ override def onOutputOperationCompleted(
+ outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized {
+ // This method is called before onBatchCompleted
+ runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).
+ updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
+ }
+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 2b43b74670..5dc0472c77 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
@@ -221,7 +221,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}
_ssc.stop()
- failureReasonsCollector.failureReasons
+ failureReasonsCollector.failureReasons.toMap
}
/** Check if a sequence of numbers is in increasing order */
@@ -307,14 +307,16 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
}
/**
- * A StreamingListener that saves the latest `failureReasons` in `BatchInfo` to the `failureReasons`
- * field.
+ * A StreamingListener that saves all latest `failureReasons` in a batch.
*/
class FailureReasonsCollector extends StreamingListener {
- @volatile var failureReasons: Map[Int, String] = null
+ val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String]
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
- failureReasons = batchCompleted.batchInfo.failureReasons
+ override def onOutputOperationCompleted(
+ outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
+ outputOperationCompleted.outputOperationInfo.failureReason.foreach { f =>
+ failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
+ }
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index d1df78871d..a5744a9009 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -117,7 +117,7 @@ class UISeleniumSuite
findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",
- "Status")
+ "Output Ops: Succeeded/Total", "Status")
}
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 995f1197cc..af4718b4eb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -63,7 +63,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test")))
// onBatchSubmitted
- val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None)
+ val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
listener.runningBatches should be (Nil)
@@ -75,7 +75,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalReceivedRecords should be (0)
// onBatchStarted
- val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+ val batchInfoStarted =
+ BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
@@ -116,7 +117,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
OutputOpIdAndSparkJobId(1, 1))
// onBatchCompleted
- val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+ val batchInfoCompleted =
+ BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (Nil)
@@ -156,7 +158,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L))
- val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+ val batchInfoCompleted =
+ BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
for(_ <- 0 until (limit + 10)) {
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
@@ -173,8 +176,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
// fulfill completedBatchInfos
for(i <- 0 until limit) {
- val batchInfoCompleted =
- BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
+ val batchInfoCompleted = BatchInfo(
+ Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart)
@@ -185,7 +188,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.onJobStart(jobStart)
val batchInfoSubmitted =
- BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None)
+ BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None, Map.empty)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
// We still can see the info retrieved from onJobStart
@@ -201,8 +204,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
// A lot of "onBatchCompleted"s happen before "onJobStart"
for(i <- limit + 1 to limit * 2) {
- val batchInfoCompleted =
- BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
+ val batchInfoCompleted = BatchInfo(
+ Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}
@@ -227,11 +230,13 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L))
// onBatchSubmitted
- val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None)
+ val batchInfoSubmitted =
+ BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
// onBatchStarted
- val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+ val batchInfoStarted =
+ BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
// onJobStart
@@ -248,7 +253,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.onJobStart(jobStart4)
// onBatchCompleted
- val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
+ val batchInfoCompleted =
+ BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}