diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-11-16 15:06:06 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-11-16 15:06:06 -0800 |
commit | bcea0bfda66a30ee86790b048de5cb47b4d0b32f (patch) | |
tree | 04049be40070588255b8e47b150d465b2d6b23a8 /streaming | |
parent | 3c025087b58f475a9bcb5c8f4b2b2df804915b2b (diff) | |
download | spark-bcea0bfda66a30ee86790b048de5cb47b4d0b32f.tar.gz spark-bcea0bfda66a30ee86790b048de5cb47b4d0b32f.tar.bz2 spark-bcea0bfda66a30ee86790b048de5cb47b4d0b32f.zip |
[SPARK-11742][STREAMING] Add the failure info to the batch lists
<img width="1365" alt="screen shot 2015-11-13 at 9 57 43 pm" src="https://cloud.githubusercontent.com/assets/1000778/11162322/9b88e204-8a51-11e5-8c57-a44889cab713.png">
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #9711 from zsxwing/failure-info.
Diffstat (limited to 'streaming')
3 files changed, 120 insertions, 50 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 125cafd41b..d339723427 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -33,6 +33,22 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th> } + /** + * Return the first failure reason if finding in the batches. + */ + protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { + batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption + } + + protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { + val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption + firstFailureReason.map { failureReason => + val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason) + UIUtils.failureReasonCell( + failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = false) + }.getOrElse(<td>-</td>) + } + protected def baseRow(batch: BatchUIData): Seq[Node] = { val batchTime = batch.batchTime.milliseconds val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval) @@ -97,9 +113,17 @@ private[ui] class ActiveBatchTable( waitingBatches: Seq[BatchUIData], batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) { + private val firstFailureReason = getFirstFailureReason(runningBatches) + override protected def columns: Seq[Node] = super.columns ++ { <th>Output Ops: Succeeded/Total</th> - <th>Status</th> + <th>Status</th> ++ { + if (firstFailureReason.nonEmpty) { + <th>Error</th> + } else { + Nil + } + } } override protected def renderRows: Seq[Node] = { @@ -110,20 +134,41 @@ private[ui] class ActiveBatchTable( } private def runningBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> + baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } } private def waitingBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td> + baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ { + if (firstFailureReason.nonEmpty) { + // Waiting batches have not run yet, so must have no failure reasons. + <td>-</td> + } else { + Nil + } + } } } private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) extends BatchTableBase("completed-batches-table", batchInterval) { + private val firstFailureReason = getFirstFailureReason(batches) + override protected def columns: Seq[Node] = super.columns ++ { <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th> - <th>Output Ops: Succeeded/Total</th> + <th>Output Ops: Succeeded/Total</th> ++ { + if (firstFailureReason.nonEmpty) { + <th>Error</th> + } else { + Nil + } + } } override protected def renderRows: Seq[Node] = { @@ -138,6 +183,12 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}> {formattedTotalDelay} </td> - } ++ createOutputOperationProgressBar(batch) + } ++ createOutputOperationProgressBar(batch)++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 2ed9255728..bc1711930d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -149,7 +149,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { total = sparkJob.numTasks - sparkJob.numSkippedTasks) } </td> - {failureReasonCell(lastFailureReason, rowspan = 1)} + {UIUtils.failureReasonCell(lastFailureReason)} </tr> } @@ -245,48 +245,6 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { </div> } - private def failureReasonCell( - failureReason: String, - rowspan: Int, - includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = { - val isMultiline = failureReason.indexOf('\n') >= 0 - // Display the first line by default - val failureReasonSummary = StringEscapeUtils.escapeHtml4( - if (isMultiline) { - failureReason.substring(0, failureReason.indexOf('\n')) - } else { - failureReason - }) - val failureDetails = - if (isMultiline && !includeFirstLineInExpandDetails) { - // Skip the first line - failureReason.substring(failureReason.indexOf('\n') + 1) - } else { - failureReason - } - val details = if (isMultiline) { - // scalastyle:off - <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" - class="expand-details"> - +details - </span> ++ - <div class="stacktrace-details collapsed"> - <pre>{failureDetails}</pre> - </div> - // scalastyle:on - } else { - "" - } - - if (rowspan == 1) { - <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td> - } else { - <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}> - {failureReasonSummary}{details} - </td> - } - } - private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = { sparkListener.activeJobs.get(sparkJobId).orElse { sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse { @@ -434,8 +392,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { private def outputOpStatusCell(outputOp: OutputOperationUIData, rowspan: Int): Seq[Node] = { outputOp.failureReason match { case Some(failureReason) => - val failureReasonForUI = generateOutputOperationStatusForUI(failureReason) - failureReasonCell(failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false) + val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason) + UIUtils.failureReasonCell( + failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false) case None => if (outputOp.endTime.isEmpty) { <td rowspan={rowspan.toString}>-</td> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala index 86cfb1fa47..d89f7ad3e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -17,6 +17,10 @@ package org.apache.spark.streaming.ui +import scala.xml.Node + +import org.apache.commons.lang3.StringEscapeUtils + import java.text.SimpleDateFormat import java.util.TimeZone import java.util.concurrent.TimeUnit @@ -124,4 +128,60 @@ private[streaming] object UIUtils { } } } + + def createOutputOperationFailureForUI(failure: String): String = { + if (failure.startsWith("org.apache.spark.Spark")) { + // SparkException or SparkDriverExecutionException + "Failed due to Spark job error\n" + failure + } else { + var nextLineIndex = failure.indexOf("\n") + if (nextLineIndex < 0) { + nextLineIndex = failure.size + } + val firstLine = failure.substring(0, nextLineIndex) + s"Failed due to error: $firstLine\n$failure" + } + } + + def failureReasonCell( + failureReason: String, + rowspan: Int = 1, + includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = { + val isMultiline = failureReason.indexOf('\n') >= 0 + // Display the first line by default + val failureReasonSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + failureReason.substring(0, failureReason.indexOf('\n')) + } else { + failureReason + }) + val failureDetails = + if (isMultiline && !includeFirstLineInExpandDetails) { + // Skip the first line + failureReason.substring(failureReason.indexOf('\n') + 1) + } else { + failureReason + } + val details = if (isMultiline) { + // scalastyle:off + <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" + class="expand-details"> + +details + </span> ++ + <div class="stacktrace-details collapsed"> + <pre>{failureDetails}</pre> + </div> + // scalastyle:on + } else { + "" + } + + if (rowspan == 1) { + <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td> + } else { + <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}> + {failureReasonSummary}{details} + </td> + } + } } |