diff options
6 files changed, 143 insertions, 27 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 3c869561ef..463f899dc2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -41,6 +41,8 @@ case class BatchInfo( private var _failureReasons: Map[Int, String] = Map.empty + private var _numOutputOp: Int = 0 + @deprecated("Use streamIdToInputInfo instead", "1.5.0") def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords) @@ -77,4 +79,12 @@ case class BatchInfo( /** Failure reasons corresponding to every output ops in the batch */ private[streaming] def failureReasons = _failureReasons + + /** Set the number of output operations in this batch */ + private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = { + _numOutputOp = numOutputOp + } + + /** Return the number of output operations in this batch */ + private[streaming] def numOutputOp: Int = _numOutputOp } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 255ccf0536..08f63cc992 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -81,6 +81,7 @@ case class JobSet( if (processingEndTime >= 0) Some(processingEndTime) else None ) binfo.setFailureReason(failureReasons) + binfo.setNumOutputOp(jobs.size) binfo } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index f702bd5bc9..3e6590d66f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -107,9 +107,10 @@ private[ui] class ActiveBatchTable( private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) extends BatchTableBase("completed-batches-table", batchInterval) { - override protected def columns: Seq[Node] = super.columns ++ - <th>Total Delay - {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th> + override protected def columns: Seq[Node] = super.columns ++ { + <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th> + <th>Output Ops: Succeeded/Total</th> + } override protected def renderRows: Seq[Node] = { batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>) @@ -118,9 +119,17 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: private def completedBatchRow(batch: BatchUIData): Seq[Node] = { val totalDelay = batch.totalDelay val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + val numFailedOutputOp = batch.failureReason.size + val outputOpColumn = if (numFailedOutputOp > 0) { + s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" + + s" (${numFailedOutputOp} failed)" + } else { + s"${batch.numOutputOp}/${batch.numOutputOp}" + } baseRow(batch) ++ <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}> {formattedTotalDelay} </td> + <td>{outputOpColumn}</td> } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 9129c1f26a..1b717b6454 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -38,6 +38,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { <th>Output Op Id</th> <th>Description</th> <th>Duration</th> + <th>Status</th> <th>Job Id</th> <th>Duration</th> <th class="sorttable_nosort">Stages: Succeeded/Total</th> @@ -49,18 +50,42 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { outputOpId: OutputOpId, outputOpDescription: Seq[Node], formattedOutputOpDuration: String, + outputOpStatus: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, sparkJob: SparkJobIdWithUIData): Seq[Node] = { if (sparkJob.jobUIData.isDefined) { generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration, - numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get) + outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get) } else { generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration, - numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId) + outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId) } } + private def generateOutputOpRowWithoutSparkJobs( + outputOpId: OutputOpId, + outputOpDescription: Seq[Node], + formattedOutputOpDuration: String, + outputOpStatus: String): Seq[Node] = { + <tr> + <td class="output-op-id-cell" >{outputOpId.toString}</td> + <td>{outputOpDescription}</td> + <td>{formattedOutputOpDuration}</td> + {outputOpStatusCell(outputOpStatus, rowspan = 1)} + <!-- Job Id --> + <td>-</td> + <!-- Duration --> + <td>-</td> + <!-- Stages: Succeeded/Total --> + <td>-</td> + <!-- Tasks (for all stages): Succeeded/Total --> + <td>-</td> + <!-- Error --> + <td>-</td> + </tr> + } + /** * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into * one cell, we use "rowspan" for the first row of a output op. @@ -69,6 +94,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { outputOpId: OutputOpId, outputOpDescription: Seq[Node], formattedOutputOpDuration: String, + outputOpStatus: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, sparkJob: JobUIData): Seq[Node] = { @@ -94,7 +120,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { <td rowspan={numSparkJobRowsInOutputOp.toString}> {outputOpDescription} </td> - <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> + <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++ + {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)} } else { Nil } @@ -125,7 +152,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { total = sparkJob.numTasks - sparkJob.numSkippedTasks) } </td> - {failureReasonCell(lastFailureReason)} + {failureReasonCell(lastFailureReason, rowspan = 1)} </tr> } @@ -137,6 +164,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { outputOpId: OutputOpId, outputOpDescription: Seq[Node], formattedOutputOpDuration: String, + outputOpStatus: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, jobId: Int): Seq[Node] = { @@ -147,7 +175,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { if (isFirstRow) { <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td> <td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td> - <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> + <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++ + {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)} } else { Nil } @@ -156,7 +185,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { <tr> {prefixCells} <td sorttable_customkey={jobId.toString}> - {jobId.toString} + {if (jobId >= 0) jobId.toString else "-"} </td> <!-- Duration --> <td>-</td> @@ -170,7 +199,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } private def generateOutputOpIdRow( - outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { + outputOpId: OutputOpId, + outputOpStatus: String, + sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { // We don't count the durations of dropped jobs val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get). map(sparkJob => { @@ -189,12 +220,32 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { val description = generateOutputOpDescription(sparkJobs) - generateJobRow( - outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++ - sparkJobs.tail.map { sparkJob => + if (sparkJobs.isEmpty) { + generateOutputOpRowWithoutSparkJobs( + outputOpId, description, formattedOutputOpDuration, outputOpStatus) + } else { + val firstRow = generateJobRow( - outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob) - }.flatMap(x => x) + outputOpId, + description, + formattedOutputOpDuration, + outputOpStatus, + sparkJobs.size, + true, + sparkJobs.head) + val tailRows = + sparkJobs.tail.map { sparkJob => + generateJobRow( + outputOpId, + description, + formattedOutputOpDuration, + outputOpStatus, + sparkJobs.size, + false, + sparkJob) + } + (firstRow ++ tailRows).flatten + } } private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { @@ -228,7 +279,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } } - private def failureReasonCell(failureReason: String): Seq[Node] = { + private def failureReasonCell( + failureReason: String, + rowspan: Int, + includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = { val isMultiline = failureReason.indexOf('\n') >= 0 // Display the first line by default val failureReasonSummary = StringEscapeUtils.escapeHtml4( @@ -237,6 +291,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } else { failureReason }) + val failureDetails = + if (isMultiline && !includeFirstLineInExpandDetails) { + // Skip the first line + failureReason.substring(failureReason.indexOf('\n') + 1) + } else { + failureReason + } val details = if (isMultiline) { // scalastyle:off <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" @@ -244,13 +305,20 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { +details </span> ++ <div class="stacktrace-details collapsed"> - <pre>{failureReason}</pre> + <pre>{failureDetails}</pre> </div> // scalastyle:on } else { "" } - <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td> + + if (rowspan == 1) { + <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td> + } else { + <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}> + {failureReasonSummary}{details} + </td> + } } private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = { @@ -265,16 +333,31 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { * Generate the job table for the batch. */ private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = { - val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq. - sortBy(_._1). // sorted by OutputOpId + val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId). map { case (outputOpId, outputOpIdAndSparkJobIds) => // sort SparkJobIds for each OutputOpId (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted) } + val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId => + val status = batchUIData.failureReason.get(outputOpId).map { failure => + if (failure.startsWith("org.apache.spark.SparkException")) { + "Failed due to Spark job error\n" + failure + } else { + var nextLineIndex = failure.indexOf("\n") + if (nextLineIndex < 0) { + nextLineIndex = failure.size + } + val firstLine = failure.substring(0, nextLineIndex) + s"Failed due to error: $firstLine\n$failure" + } + }.getOrElse("Succeeded") + val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty) + (outputOpId, status, sparkJobIds) + } sparkListener.synchronized { - val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] = - outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) => - (outputOpId, + val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] = + outputOps.map { case (outputOpId, status, sparkJobIds) => + (outputOpId, status, sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId)))) } @@ -285,7 +368,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { <tbody> { outputOpIdWithJobs.map { - case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds) + case (outputOpId, status, sparkJobIds) => + generateOutputOpIdRow(outputOpId, status, sparkJobIds) } } </tbody> @@ -386,4 +470,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription). replaceAllLiterally("\t", " ").replaceAllLiterally("\n", "<br/>")) } + + private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = { + if (status == "Succeeded") { + <td rowspan={rowspan.toString}>Succeeded</td> + } else { + failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false) + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala index ae508c0e95..e6c2e2140c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala @@ -30,6 +30,8 @@ private[ui] case class BatchUIData( val submissionTime: Long, val processingStartTime: Option[Long], val processingEndTime: Option[Long], + val numOutputOp: Int, + val failureReason: Map[Int, String], var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) { /** @@ -69,7 +71,9 @@ private[ui] object BatchUIData { batchInfo.streamIdToInputInfo, batchInfo.submissionTime, batchInfo.processingStartTime, - batchInfo.processingEndTime + batchInfo.processingEndTime, + batchInfo.numOutputOp, + batchInfo.failureReasons ) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index 068a6cb0e8..d1df78871d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -121,7 +121,7 @@ class UISeleniumSuite } findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)", - "Total Delay (?)") + "Total Delay (?)", "Output Ops: Succeeded/Total") } val batchLinks = @@ -138,7 +138,7 @@ class UISeleniumSuite summaryText should contain ("Total delay:") findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be { - List("Output Op Id", "Description", "Duration", "Job Id", "Duration", + List("Output Op Id", "Description", "Duration", "Status", "Job Id", "Duration", "Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error") } |