From 369d786f58580e7df73e7e23f27390d37269d0de Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 16 Oct 2015 13:53:06 -0700 Subject: [SPARK-10974] [STREAMING] Add progress bar for output operation column and use red dots for failed batches Screenshot: 1 Also fixed the description and duration for output operations that don't have spark jobs. 2 Author: zsxwing Closes #9010 from zsxwing/output-op-progress-bar. --- .../spark/streaming/ui/static/streaming-page.js | 26 +-- .../org/apache/spark/streaming/DStreamGraph.scala | 2 +- .../spark/streaming/scheduler/BatchInfo.scala | 23 +-- .../org/apache/spark/streaming/scheduler/Job.scala | 30 +++- .../spark/streaming/scheduler/JobScheduler.scala | 12 +- .../apache/spark/streaming/scheduler/JobSet.scala | 17 +- .../streaming/scheduler/OutputOperationInfo.scala | 6 +- .../spark/streaming/ui/AllBatchesTable.scala | 40 +++-- .../org/apache/spark/streaming/ui/BatchPage.scala | 174 +++++++++------------ .../apache/spark/streaming/ui/BatchUIData.scala | 67 +++++++- .../ui/StreamingJobProgressListener.scala | 14 ++ .../spark/streaming/StreamingListenerSuite.scala | 16 +- .../apache/spark/streaming/UISeleniumSuite.scala | 2 +- .../ui/StreamingJobProgressListenerSuite.scala | 30 ++-- 14 files changed, 258 insertions(+), 201 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js index 4886b68eea..f82323a1cd 100644 --- a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js +++ b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js @@ -154,34 +154,40 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) { var lastClickedBatch = null; var lastTimeout = null; + function isFailedBatch(batchTime) { + return $("#batch-" + batchTime).attr("isFailed") == "true"; + } + // Add points to the line. However, we make it invisible at first. But when the user moves mouse // over a point, it will be displayed with its detail. svg.selectAll(".point") .data(data) .enter().append("circle") - .attr("stroke", "white") // white and opacity = 0 make it invisible - .attr("fill", "white") - .attr("opacity", "0") + .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) // white and opacity = 0 make it invisible + .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) + .attr("opacity", function(d) { return isFailedBatch(d.x) ? "1" : "0";}) .style("cursor", "pointer") .attr("cx", function(d) { return x(d.x); }) .attr("cy", function(d) { return y(d.y); }) - .attr("r", function(d) { return 3; }) + .attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";}) .on('mouseover', function(d) { var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x]; showBootstrapTooltip(d3.select(this).node(), tip); // show the point d3.select(this) - .attr("stroke", "steelblue") - .attr("fill", "steelblue") - .attr("opacity", "1"); + .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "steelblue";}) + .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "steelblue";}) + .attr("opacity", "1") + .attr("r", "3"); }) .on('mouseout', function() { hideBootstrapTooltip(d3.select(this).node()); // hide the point d3.select(this) - .attr("stroke", "white") - .attr("fill", "white") - .attr("opacity", "0"); + .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) + .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) + .attr("opacity", function(d) { return isFailedBatch(d.x) ? "1" : "0";}) + .attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";}); }) .on("click", function(d) { if (lastTimeout != null) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index de79c9ef1a..1b0b7890b3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -113,7 +113,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) - jobOption.foreach(_.setCallSite(outputStream.creationSite.longForm)) + jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 463f899dc2..436eb0a566 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time * the streaming scheduler queue * @param processingStartTime Clock time of when the first job of this batch started processing * @param processingEndTime Clock time of when the last job of this batch finished processing + * @param outputOperationInfos The output operations in this batch */ @DeveloperApi case class BatchInfo( @@ -36,13 +37,10 @@ case class BatchInfo( streamIdToInputInfo: Map[Int, StreamInputInfo], submissionTime: Long, processingStartTime: Option[Long], - processingEndTime: Option[Long] + processingEndTime: Option[Long], + outputOperationInfos: Map[Int, OutputOperationInfo] ) { - private var _failureReasons: Map[Int, String] = Map.empty - - private var _numOutputOp: Int = 0 - @deprecated("Use streamIdToInputInfo instead", "1.5.0") def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords) @@ -72,19 +70,4 @@ case class BatchInfo( */ def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum - /** Set the failure reasons corresponding to every output ops in the batch */ - private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = { - _failureReasons = reasons - } - - /** Failure reasons corresponding to every output ops in the batch */ - private[streaming] def failureReasons = _failureReasons - - /** Set the number of output operations in this batch */ - private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = { - _numOutputOp = numOutputOp - } - - /** Return the number of output operations in this batch */ - private[streaming] def numOutputOp: Int = _numOutputOp } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala index 1373053f06..ab1b3565fc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala @@ -17,8 +17,10 @@ package org.apache.spark.streaming.scheduler +import scala.util.{Failure, Try} + import org.apache.spark.streaming.Time -import scala.util.Try +import org.apache.spark.util.{Utils, CallSite} /** * Class representing a Spark computation. It may contain multiple Spark jobs. @@ -29,7 +31,9 @@ class Job(val time: Time, func: () => _) { private var _outputOpId: Int = _ private var isSet = false private var _result: Try[_] = null - private var _callSite: String = "Unknown" + private var _callSite: CallSite = null + private var _startTime: Option[Long] = None + private var _endTime: Option[Long] = None def run() { _result = Try(func()) @@ -71,11 +75,29 @@ class Job(val time: Time, func: () => _) { _outputOpId = outputOpId } - def setCallSite(callSite: String): Unit = { + def setCallSite(callSite: CallSite): Unit = { _callSite = callSite } - def callSite: String = _callSite + def callSite: CallSite = _callSite + + def setStartTime(startTime: Long): Unit = { + _startTime = Some(startTime) + } + + def setEndTime(endTime: Long): Unit = { + _endTime = Some(endTime) + } + + def toOutputOperationInfo: OutputOperationInfo = { + val failureReason = if (_result != null && _result.isFailure) { + Some(Utils.exceptionString(_result.asInstanceOf[Failure[_]].exception)) + } else { + None + } + OutputOperationInfo( + time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, failureReason) + } override def toString: String = id } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 0a4a396a0f..2480b4ec09 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -20,13 +20,13 @@ package org.apache.spark.streaming.scheduler import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ -import scala.util.{Failure, Success} +import scala.util.Failure import org.apache.spark.Logging import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ import org.apache.spark.streaming.ui.UIUtils -import org.apache.spark.util.{EventLoop, ThreadUtils} +import org.apache.spark.util.{EventLoop, ThreadUtils, Utils} private[scheduler] sealed trait JobSchedulerEvent @@ -162,16 +162,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // correct "jobSet.processingStartTime". listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo)) } - listenerBus.post(StreamingListenerOutputOperationStarted( - OutputOperationInfo(job.time, job.outputOpId, job.callSite, Some(startTime), None))) + job.setStartTime(startTime) + listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo)) logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) } private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) - listenerBus.post(StreamingListenerOutputOperationCompleted( - OutputOperationInfo(job.time, job.outputOpId, job.callSite, None, Some(completedTime)))) + job.setEndTime(completedTime) + listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 08f63cc992..f76300351e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -64,24 +64,13 @@ case class JobSet( } def toBatchInfo: BatchInfo = { - val failureReasons: Map[Int, String] = { - if (hasCompleted) { - jobs.filter(_.result.isFailure).map { job => - (job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception)) - }.toMap - } else { - Map.empty - } - } - val binfo = new BatchInfo( + BatchInfo( time, streamIdToInputInfo, submissionTime, if (processingStartTime >= 0) Some(processingStartTime) else None, - if (processingEndTime >= 0) Some(processingEndTime) else None + if (processingEndTime >= 0) Some(processingEndTime) else None, + jobs.map { job => (job.outputOpId, job.toOutputOperationInfo) }.toMap ) - binfo.setFailureReason(failureReasons) - binfo.setNumOutputOp(jobs.size) - binfo } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala index d5614b3439..137e512a67 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala @@ -25,17 +25,21 @@ import org.apache.spark.streaming.Time * Class having information on output operations. * @param batchTime Time of the batch * @param id Id of this output operation. Different output operations have different ids in a batch. + * @param name The name of this output operation. * @param description The description of this output operation. * @param startTime Clock time of when the output operation started processing * @param endTime Clock time of when the output operation started processing + * @param failureReason Failure reason if this output operation fails */ @DeveloperApi case class OutputOperationInfo( batchTime: Time, id: Int, + name: String, description: String, startTime: Option[Long], - endTime: Option[Long]) { + endTime: Option[Long], + failureReason: Option[String]) { /** * Return the duration of this output operation. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 3e6590d66f..125cafd41b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -17,9 +17,6 @@ package org.apache.spark.streaming.ui -import java.text.SimpleDateFormat -import java.util.Date - import scala.xml.Node import org.apache.spark.ui.{UIUtils => SparkUIUtils} @@ -46,7 +43,8 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") val batchTimeId = s"batch-$batchTime" - + {formattedBatchTime} @@ -75,6 +73,19 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) batchTable } + protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { + + { + SparkUIUtils.makeProgressBar( + started = batch.numActiveOutputOp, + completed = batch.numCompletedOutputOp, + failed = batch.numFailedOutputOp, + skipped = 0, + total = batch.outputOperations.size) + } + + } + /** * Return HTML for all rows of this table. */ @@ -86,7 +97,10 @@ private[ui] class ActiveBatchTable( waitingBatches: Seq[BatchUIData], batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) { - override protected def columns: Seq[Node] = super.columns ++ Status + override protected def columns: Seq[Node] = super.columns ++ { + Output Ops: Succeeded/Total + Status + } override protected def renderRows: Seq[Node] = { // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display @@ -96,11 +110,11 @@ private[ui] class ActiveBatchTable( } private def runningBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ processing + baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ processing } private def waitingBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ queued + baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ queued } } @@ -119,17 +133,11 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: private def completedBatchRow(batch: BatchUIData): Seq[Node] = { val totalDelay = batch.totalDelay val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - val numFailedOutputOp = batch.failureReason.size - val outputOpColumn = if (numFailedOutputOp > 0) { - s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" + - s" (${numFailedOutputOp} failed)" - } else { - s"${batch.numOutputOp}/${batch.numOutputOp}" - } - baseRow(batch) ++ + + baseRow(batch) ++ { {formattedTotalDelay} - {outputOpColumn} + } ++ createOutputOperationProgressBar(batch) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index a19b85a51d..2ed9255728 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -47,32 +47,30 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } private def generateJobRow( - outputOpId: OutputOpId, + outputOpData: OutputOperationUIData, outputOpDescription: Seq[Node], formattedOutputOpDuration: String, - outputOpStatus: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, sparkJob: SparkJobIdWithUIData): Seq[Node] = { if (sparkJob.jobUIData.isDefined) { - generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration, - outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get) + generateNormalJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration, + numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get) } else { - generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration, - outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId) + generateDroppedJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration, + numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId) } } private def generateOutputOpRowWithoutSparkJobs( - outputOpId: OutputOpId, + outputOpData: OutputOperationUIData, outputOpDescription: Seq[Node], - formattedOutputOpDuration: String, - outputOpStatus: String): Seq[Node] = { + formattedOutputOpDuration: String): Seq[Node] = { - {outputOpId.toString} + {outputOpData.id.toString} {outputOpDescription} {formattedOutputOpDuration} - {outputOpStatusCell(outputOpStatus, rowspan = 1)} + {outputOpStatusCell(outputOpData, rowspan = 1)} - @@ -91,10 +89,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { * one cell, we use "rowspan" for the first row of a output op. */ private def generateNormalJobRow( - outputOpId: OutputOpId, + outputOpData: OutputOperationUIData, outputOpDescription: Seq[Node], formattedOutputOpDuration: String, - outputOpStatus: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, sparkJob: JobUIData): Seq[Node] = { @@ -116,12 +113,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { // scalastyle:off val prefixCells = if (isFirstRow) { - {outputOpId.toString} + {outputOpData.id.toString} {outputOpDescription} {formattedOutputOpDuration} ++ - {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)} + {outputOpStatusCell(outputOpData, numSparkJobRowsInOutputOp)} } else { Nil } @@ -161,10 +158,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { * with "-" cells. */ private def generateDroppedJobRow( - outputOpId: OutputOpId, + outputOpData: OutputOperationUIData, outputOpDescription: Seq[Node], formattedOutputOpDuration: String, - outputOpStatus: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, jobId: Int): Seq[Node] = { @@ -173,10 +169,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { // scalastyle:off val prefixCells = if (isFirstRow) { - {outputOpId.toString} + {outputOpData.id.toString} {outputOpDescription} {formattedOutputOpDuration} ++ - {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)} + {outputOpStatusCell(outputOpData, numSparkJobRowsInOutputOp)} } else { Nil } @@ -199,47 +195,34 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } private def generateOutputOpIdRow( - outputOpId: OutputOpId, - outputOpStatus: String, + outputOpData: OutputOperationUIData, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { - // We don't count the durations of dropped jobs - val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get). - map(sparkJob => { - sparkJob.submissionTime.map { start => - val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis()) - end - start - } - }) val formattedOutputOpDuration = - if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) { - // If no job or any job does not finish, set "formattedOutputOpDuration" to "-" + if (outputOpData.duration.isEmpty) { "-" } else { - SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum) + SparkUIUtils.formatDuration(outputOpData.duration.get) } - val description = generateOutputOpDescription(sparkJobs) + val description = generateOutputOpDescription(outputOpData) if (sparkJobs.isEmpty) { - generateOutputOpRowWithoutSparkJobs( - outputOpId, description, formattedOutputOpDuration, outputOpStatus) + generateOutputOpRowWithoutSparkJobs(outputOpData, description, formattedOutputOpDuration) } else { val firstRow = generateJobRow( - outputOpId, + outputOpData, description, formattedOutputOpDuration, - outputOpStatus, sparkJobs.size, true, sparkJobs.head) val tailRows = sparkJobs.tail.map { sparkJob => generateJobRow( - outputOpId, + outputOpData, description, formattedOutputOpDuration, - outputOpStatus, sparkJobs.size, false, sparkJob) @@ -248,35 +231,18 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } } - private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { - val lastStageInfo = - sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData - flatMap { sparkJob => // For the first job, get the latest Stage info - if (sparkJob.stageIds.isEmpty) { - None - } else { - sparkListener.stageIdToInfo.get(sparkJob.stageIds.max) - } - } - lastStageInfo match { - case Some(stageInfo) => - val details = if (stageInfo.details.nonEmpty) { - - +details - ++ - - } else { - NodeSeq.Empty - } - -
{stageInfo.name} {details}
- case None => - Text("(Unknown)") - } + private def generateOutputOpDescription(outputOp: OutputOperationUIData): Seq[Node] = { +
+ {outputOp.name} + + +details + + +
} private def failureReasonCell( @@ -329,6 +295,19 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } } + private def generateOutputOperationStatusForUI(failure: String): String = { + if (failure.startsWith("org.apache.spark.SparkException")) { + "Failed due to Spark job error\n" + failure + } else { + var nextLineIndex = failure.indexOf("\n") + if (nextLineIndex < 0) { + nextLineIndex = failure.size + } + val firstLine = failure.substring(0, nextLineIndex) + s"Failed due to error: $firstLine\n$failure" + } + } + /** * Generate the job table for the batch. */ @@ -338,26 +317,15 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { // sort SparkJobIds for each OutputOpId (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted) } - val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId => - val status = batchUIData.failureReason.get(outputOpId).map { failure => - if (failure.startsWith("org.apache.spark.SparkException")) { - "Failed due to Spark job error\n" + failure - } else { - var nextLineIndex = failure.indexOf("\n") - if (nextLineIndex < 0) { - nextLineIndex = failure.size - } - val firstLine = failure.substring(0, nextLineIndex) - s"Failed due to error: $firstLine\n$failure" - } - }.getOrElse("Succeeded") - val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty) - (outputOpId, status, sparkJobIds) - } + + val outputOps: Seq[(OutputOperationUIData, Seq[SparkJobId])] = + batchUIData.outputOperations.map { case (outputOpId, outputOperation) => + val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty) + (outputOperation, sparkJobIds) + }.toSeq.sortBy(_._1.id) sparkListener.synchronized { - val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] = - outputOps.map { case (outputOpId, status, sparkJobIds) => - (outputOpId, status, + val outputOpWithJobs = outputOps.map { case (outputOpData, sparkJobIds) => + (outputOpData, sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId)))) } @@ -367,9 +335,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { { - outputOpIdWithJobs.map { - case (outputOpId, status, sparkJobIds) => - generateOutputOpIdRow(outputOpId, status, sparkJobIds) + outputOpWithJobs.map { case (outputOpData, sparkJobIds) => + generateOutputOpIdRow(outputOpData, sparkJobIds) } } @@ -377,7 +344,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } } - def render(request: HttpServletRequest): Seq[Node] = { + def render(request: HttpServletRequest): Seq[Node] = streamingListener.synchronized { val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse { throw new IllegalArgumentException(s"Missing id parameter") } @@ -430,14 +397,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { - val jobTable = - if (batchUIData.outputOpIdSparkJobIdPairs.isEmpty) { -
Cannot find any job for Batch {formattedBatchTime}.
- } else { - generateJobTable(batchUIData) - } - - val content = summary ++ jobTable + val content = summary ++ generateJobTable(batchUIData) SparkUIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent) } @@ -471,11 +431,17 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { replaceAllLiterally("\t", "    ").replaceAllLiterally("\n", "
")) } - private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = { - if (status == "Succeeded") { - Succeeded - } else { - failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false) + private def outputOpStatusCell(outputOp: OutputOperationUIData, rowspan: Int): Seq[Node] = { + outputOp.failureReason match { + case Some(failureReason) => + val failureReasonForUI = generateOutputOperationStatusForUI(failureReason) + failureReasonCell(failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false) + case None => + if (outputOp.endTime.isEmpty) { + - + } else { + Succeeded + } } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala index e6c2e2140c..3ef3689de1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala @@ -18,8 +18,10 @@ package org.apache.spark.streaming.ui +import scala.collection.mutable + import org.apache.spark.streaming.Time -import org.apache.spark.streaming.scheduler.{BatchInfo, StreamInputInfo} +import org.apache.spark.streaming.scheduler.{BatchInfo, OutputOperationInfo, StreamInputInfo} import org.apache.spark.streaming.ui.StreamingJobProgressListener._ private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId) @@ -30,8 +32,7 @@ private[ui] case class BatchUIData( val submissionTime: Long, val processingStartTime: Option[Long], val processingEndTime: Option[Long], - val numOutputOp: Int, - val failureReason: Map[Int, String], + val outputOperations: mutable.HashMap[OutputOpId, OutputOperationUIData] = mutable.HashMap(), var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) { /** @@ -61,19 +62,75 @@ private[ui] case class BatchUIData( * The number of recorders received by the receivers in this batch. */ def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum + + /** + * Update an output operation information of this batch. + */ + def updateOutputOperationInfo(outputOperationInfo: OutputOperationInfo): Unit = { + assert(batchTime == outputOperationInfo.batchTime) + outputOperations(outputOperationInfo.id) = OutputOperationUIData(outputOperationInfo) + } + + /** + * Return the number of failed output operations. + */ + def numFailedOutputOp: Int = outputOperations.values.count(_.failureReason.nonEmpty) + + /** + * Return the number of running output operations. + */ + def numActiveOutputOp: Int = outputOperations.values.count(_.endTime.isEmpty) + + /** + * Return the number of completed output operations. + */ + def numCompletedOutputOp: Int = outputOperations.values.count { + op => op.failureReason.isEmpty && op.endTime.nonEmpty + } + + /** + * Return if this batch has any output operations + */ + def isFailed: Boolean = numFailedOutputOp != 0 } private[ui] object BatchUIData { def apply(batchInfo: BatchInfo): BatchUIData = { + val outputOperations = mutable.HashMap[OutputOpId, OutputOperationUIData]() + outputOperations ++= batchInfo.outputOperationInfos.mapValues(OutputOperationUIData.apply) new BatchUIData( batchInfo.batchTime, batchInfo.streamIdToInputInfo, batchInfo.submissionTime, batchInfo.processingStartTime, batchInfo.processingEndTime, - batchInfo.numOutputOp, - batchInfo.failureReasons + outputOperations + ) + } +} + +private[ui] case class OutputOperationUIData( + id: OutputOpId, + name: String, + description: String, + startTime: Option[Long], + endTime: Option[Long], + failureReason: Option[String]) { + + def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s +} + +private[ui] object OutputOperationUIData { + + def apply(outputOperationInfo: OutputOperationInfo): OutputOperationUIData = { + OutputOperationUIData( + outputOperationInfo.id, + outputOperationInfo.name, + outputOperationInfo.description, + outputOperationInfo.startTime, + outputOperationInfo.endTime, + outputOperationInfo.failureReason ) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 78aeb004e1..f6cc6edf25 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -119,6 +119,20 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } } + override def onOutputOperationStarted( + outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = synchronized { + // This method is called after onBatchStarted + runningBatchUIData(outputOperationStarted.outputOperationInfo.batchTime). + updateOutputOperationInfo(outputOperationStarted.outputOperationInfo) + } + + override def onOutputOperationCompleted( + outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized { + // This method is called before onBatchCompleted + runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime). + updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) + } + override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized { getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) => var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 2b43b74670..5dc0472c77 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} +import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap} import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global @@ -221,7 +221,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } _ssc.stop() - failureReasonsCollector.failureReasons + failureReasonsCollector.failureReasons.toMap } /** Check if a sequence of numbers is in increasing order */ @@ -307,14 +307,16 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O } /** - * A StreamingListener that saves the latest `failureReasons` in `BatchInfo` to the `failureReasons` - * field. + * A StreamingListener that saves all latest `failureReasons` in a batch. */ class FailureReasonsCollector extends StreamingListener { - @volatile var failureReasons: Map[Int, String] = null + val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String] - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - failureReasons = batchCompleted.batchInfo.failureReasons + override def onOutputOperationCompleted( + outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = { + outputOperationCompleted.outputOperationInfo.failureReason.foreach { f => + failureReasons(outputOperationCompleted.outputOperationInfo.id) = f + } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index d1df78871d..a5744a9009 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -117,7 +117,7 @@ class UISeleniumSuite findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be { List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)", - "Status") + "Output Ops: Succeeded/Total", "Status") } findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)", diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 995f1197cc..af4718b4eb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -63,7 +63,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { 1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test"))) // onBatchSubmitted - val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None) + val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted))) listener.runningBatches should be (Nil) @@ -75,7 +75,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.numTotalReceivedRecords should be (0) // onBatchStarted - val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None) + val batchInfoStarted = + BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) listener.waitingBatches should be (Nil) listener.runningBatches should be (List(BatchUIData(batchInfoStarted))) @@ -116,7 +117,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { OutputOpIdAndSparkJobId(1, 1)) // onBatchCompleted - val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None) + val batchInfoCompleted = + BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) listener.waitingBatches should be (Nil) listener.runningBatches should be (Nil) @@ -156,7 +158,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L)) - val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None) + val batchInfoCompleted = + BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) for(_ <- 0 until (limit + 10)) { listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) @@ -173,8 +176,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // fulfill completedBatchInfos for(i <- 0 until limit) { - val batchInfoCompleted = - BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None) + val batchInfoCompleted = BatchInfo( + Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1) listener.onJobStart(jobStart) @@ -185,7 +188,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.onJobStart(jobStart) val batchInfoSubmitted = - BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None) + BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) // We still can see the info retrieved from onJobStart @@ -201,8 +204,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // A lot of "onBatchCompleted"s happen before "onJobStart" for(i <- limit + 1 to limit * 2) { - val batchInfoCompleted = - BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None) + val batchInfoCompleted = BatchInfo( + Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) } @@ -227,11 +230,13 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L)) // onBatchSubmitted - val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None) + val batchInfoSubmitted = + BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) // onBatchStarted - val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None) + val batchInfoStarted = + BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) // onJobStart @@ -248,7 +253,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.onJobStart(jobStart4) // onBatchCompleted - val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None) + val batchInfoCompleted = + BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) } -- cgit v1.2.3