aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-11-16 15:06:06 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-16 15:06:06 -0800
commitbcea0bfda66a30ee86790b048de5cb47b4d0b32f (patch)
tree04049be40070588255b8e47b150d465b2d6b23a8 /streaming
parent3c025087b58f475a9bcb5c8f4b2b2df804915b2b (diff)
downloadspark-bcea0bfda66a30ee86790b048de5cb47b4d0b32f.tar.gz
spark-bcea0bfda66a30ee86790b048de5cb47b4d0b32f.tar.bz2
spark-bcea0bfda66a30ee86790b048de5cb47b4d0b32f.zip
[SPARK-11742][STREAMING] Add the failure info to the batch lists
<img width="1365" alt="screen shot 2015-11-13 at 9 57 43 pm" src="https://cloud.githubusercontent.com/assets/1000778/11162322/9b88e204-8a51-11e5-8c57-a44889cab713.png"> Author: Shixiong Zhu <shixiong@databricks.com> Closes #9711 from zsxwing/failure-info.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala61
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala49
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala60
3 files changed, 120 insertions, 50 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 125cafd41b..d339723427 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -33,6 +33,22 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
{SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
}
+ /**
+ * Return the first failure reason if finding in the batches.
+ */
+ protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
+ batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
+ }
+
+ protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
+ val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
+ firstFailureReason.map { failureReason =>
+ val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
+ UIUtils.failureReasonCell(
+ failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = false)
+ }.getOrElse(<td>-</td>)
+ }
+
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
@@ -97,9 +113,17 @@ private[ui] class ActiveBatchTable(
waitingBatches: Seq[BatchUIData],
batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
+ private val firstFailureReason = getFirstFailureReason(runningBatches)
+
override protected def columns: Seq[Node] = super.columns ++ {
<th>Output Ops: Succeeded/Total</th>
- <th>Status</th>
+ <th>Status</th> ++ {
+ if (firstFailureReason.nonEmpty) {
+ <th>Error</th>
+ } else {
+ Nil
+ }
+ }
}
override protected def renderRows: Seq[Node] = {
@@ -110,20 +134,41 @@ private[ui] class ActiveBatchTable(
}
private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
- baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td>
+ baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
+ if (firstFailureReason.nonEmpty) {
+ getFirstFailureTableCell(batch)
+ } else {
+ Nil
+ }
+ }
}
private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
- baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>
+ baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
+ if (firstFailureReason.nonEmpty) {
+ // Waiting batches have not run yet, so must have no failure reasons.
+ <td>-</td>
+ } else {
+ Nil
+ }
+ }
}
}
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {
+ private val firstFailureReason = getFirstFailureReason(batches)
+
override protected def columns: Seq[Node] = super.columns ++ {
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
- <th>Output Ops: Succeeded/Total</th>
+ <th>Output Ops: Succeeded/Total</th> ++ {
+ if (firstFailureReason.nonEmpty) {
+ <th>Error</th>
+ } else {
+ Nil
+ }
+ }
}
override protected def renderRows: Seq[Node] = {
@@ -138,6 +183,12 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
- } ++ createOutputOperationProgressBar(batch)
+ } ++ createOutputOperationProgressBar(batch)++ {
+ if (firstFailureReason.nonEmpty) {
+ getFirstFailureTableCell(batch)
+ } else {
+ Nil
+ }
+ }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 2ed9255728..bc1711930d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -149,7 +149,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
</td>
- {failureReasonCell(lastFailureReason, rowspan = 1)}
+ {UIUtils.failureReasonCell(lastFailureReason)}
</tr>
}
@@ -245,48 +245,6 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</div>
}
- private def failureReasonCell(
- failureReason: String,
- rowspan: Int,
- includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
- val isMultiline = failureReason.indexOf('\n') >= 0
- // Display the first line by default
- val failureReasonSummary = StringEscapeUtils.escapeHtml4(
- if (isMultiline) {
- failureReason.substring(0, failureReason.indexOf('\n'))
- } else {
- failureReason
- })
- val failureDetails =
- if (isMultiline && !includeFirstLineInExpandDetails) {
- // Skip the first line
- failureReason.substring(failureReason.indexOf('\n') + 1)
- } else {
- failureReason
- }
- val details = if (isMultiline) {
- // scalastyle:off
- <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
- class="expand-details">
- +details
- </span> ++
- <div class="stacktrace-details collapsed">
- <pre>{failureDetails}</pre>
- </div>
- // scalastyle:on
- } else {
- ""
- }
-
- if (rowspan == 1) {
- <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
- } else {
- <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
- {failureReasonSummary}{details}
- </td>
- }
- }
-
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
sparkListener.activeJobs.get(sparkJobId).orElse {
sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
@@ -434,8 +392,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private def outputOpStatusCell(outputOp: OutputOperationUIData, rowspan: Int): Seq[Node] = {
outputOp.failureReason match {
case Some(failureReason) =>
- val failureReasonForUI = generateOutputOperationStatusForUI(failureReason)
- failureReasonCell(failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false)
+ val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
+ UIUtils.failureReasonCell(
+ failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false)
case None =>
if (outputOp.endTime.isEmpty) {
<td rowspan={rowspan.toString}>-</td>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index 86cfb1fa47..d89f7ad3e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -17,6 +17,10 @@
package org.apache.spark.streaming.ui
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+
import java.text.SimpleDateFormat
import java.util.TimeZone
import java.util.concurrent.TimeUnit
@@ -124,4 +128,60 @@ private[streaming] object UIUtils {
}
}
}
+
+ def createOutputOperationFailureForUI(failure: String): String = {
+ if (failure.startsWith("org.apache.spark.Spark")) {
+ // SparkException or SparkDriverExecutionException
+ "Failed due to Spark job error\n" + failure
+ } else {
+ var nextLineIndex = failure.indexOf("\n")
+ if (nextLineIndex < 0) {
+ nextLineIndex = failure.size
+ }
+ val firstLine = failure.substring(0, nextLineIndex)
+ s"Failed due to error: $firstLine\n$failure"
+ }
+ }
+
+ def failureReasonCell(
+ failureReason: String,
+ rowspan: Int = 1,
+ includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
+ val isMultiline = failureReason.indexOf('\n') >= 0
+ // Display the first line by default
+ val failureReasonSummary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) {
+ failureReason.substring(0, failureReason.indexOf('\n'))
+ } else {
+ failureReason
+ })
+ val failureDetails =
+ if (isMultiline && !includeFirstLineInExpandDetails) {
+ // Skip the first line
+ failureReason.substring(failureReason.indexOf('\n') + 1)
+ } else {
+ failureReason
+ }
+ val details = if (isMultiline) {
+ // scalastyle:off
+ <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+ class="expand-details">
+ +details
+ </span> ++
+ <div class="stacktrace-details collapsed">
+ <pre>{failureDetails}</pre>
+ </div>
+ // scalastyle:on
+ } else {
+ ""
+ }
+
+ if (rowspan == 1) {
+ <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
+ } else {
+ <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
+ {failureReasonSummary}{details}
+ </td>
+ }
+ }
}