aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-10-06 16:51:03 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-06 16:51:03 -0700
commitffe6831e49e28eb855f857fdfa5dd99341e80c9d (patch)
treeee64cd7d62505fbe44905d9b3e4791e845e50925
parent5e035403d41527f092705c68f0dd1b4b946014d6 (diff)
downloadspark-ffe6831e49e28eb855f857fdfa5dd99341e80c9d.tar.gz
spark-ffe6831e49e28eb855f857fdfa5dd99341e80c9d.tar.bz2
spark-ffe6831e49e28eb855f857fdfa5dd99341e80c9d.zip
[SPARK-10885] [STREAMING] Display the failed output op in Streaming UI
This PR implements the following features for both `master` and `branch-1.5`. 1. Display the failed output op count in the batch list 2. Display the failure reason of output op in the batch detail page Screenshots: <img width="1356" alt="1" src="https://cloud.githubusercontent.com/assets/1000778/10198387/5b2b97ec-67ce-11e5-81c2-f818b9d2f3ad.png"> <img width="1356" alt="2" src="https://cloud.githubusercontent.com/assets/1000778/10198388/5b76ac14-67ce-11e5-8c8b-de2683c5b485.png"> There are still two remaining problems in the UI. 1. If an output operation doesn't run any spark job, we cannot get the its duration since now it's the sum of all jobs' durations. 2. If an output operation doesn't run any spark job, we cannot get the description since it's the latest job's call site. We need to add new `StreamingListenerEvent` about output operations to fix them. So I'd like to fix them only for `master` in another PR. Author: zsxwing <zsxwing@gmail.com> Closes #8950 from zsxwing/batch-failure.
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala134
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala4
6 files changed, 143 insertions, 27 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 3c869561ef..463f899dc2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -41,6 +41,8 @@ case class BatchInfo(
private var _failureReasons: Map[Int, String] = Map.empty
+ private var _numOutputOp: Int = 0
+
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
@@ -77,4 +79,12 @@ case class BatchInfo(
/** Failure reasons corresponding to every output ops in the batch */
private[streaming] def failureReasons = _failureReasons
+
+ /** Set the number of output operations in this batch */
+ private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = {
+ _numOutputOp = numOutputOp
+ }
+
+ /** Return the number of output operations in this batch */
+ private[streaming] def numOutputOp: Int = _numOutputOp
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 255ccf0536..08f63cc992 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -81,6 +81,7 @@ case class JobSet(
if (processingEndTime >= 0) Some(processingEndTime) else None
)
binfo.setFailureReason(failureReasons)
+ binfo.setNumOutputOp(jobs.size)
binfo
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index f702bd5bc9..3e6590d66f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -107,9 +107,10 @@ private[ui] class ActiveBatchTable(
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {
- override protected def columns: Seq[Node] = super.columns ++
- <th>Total Delay
- {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
+ override protected def columns: Seq[Node] = super.columns ++ {
+ <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
+ <th>Output Ops: Succeeded/Total</th>
+ }
override protected def renderRows: Seq[Node] = {
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
@@ -118,9 +119,17 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+ val numFailedOutputOp = batch.failureReason.size
+ val outputOpColumn = if (numFailedOutputOp > 0) {
+ s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" +
+ s" (${numFailedOutputOp} failed)"
+ } else {
+ s"${batch.numOutputOp}/${batch.numOutputOp}"
+ }
baseRow(batch) ++
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
+ <td>{outputOpColumn}</td>
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 9129c1f26a..1b717b6454 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -38,6 +38,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Output Op Id</th>
<th>Description</th>
<th>Duration</th>
+ <th>Status</th>
<th>Job Id</th>
<th>Duration</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
@@ -49,18 +50,42 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
+ outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) {
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
- numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+ outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
} else {
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
- numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+ outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
}
}
+ private def generateOutputOpRowWithoutSparkJobs(
+ outputOpId: OutputOpId,
+ outputOpDescription: Seq[Node],
+ formattedOutputOpDuration: String,
+ outputOpStatus: String): Seq[Node] = {
+ <tr>
+ <td class="output-op-id-cell" >{outputOpId.toString}</td>
+ <td>{outputOpDescription}</td>
+ <td>{formattedOutputOpDuration}</td>
+ {outputOpStatusCell(outputOpStatus, rowspan = 1)}
+ <!-- Job Id -->
+ <td>-</td>
+ <!-- Duration -->
+ <td>-</td>
+ <!-- Stages: Succeeded/Total -->
+ <td>-</td>
+ <!-- Tasks (for all stages): Succeeded/Total -->
+ <td>-</td>
+ <!-- Error -->
+ <td>-</td>
+ </tr>
+ }
+
/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
@@ -69,6 +94,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
+ outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
@@ -94,7 +120,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<td rowspan={numSparkJobRowsInOutputOp.toString}>
{outputOpDescription}
</td>
- <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+ <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
+ {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
} else {
Nil
}
@@ -125,7 +152,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
</td>
- {failureReasonCell(lastFailureReason)}
+ {failureReasonCell(lastFailureReason, rowspan = 1)}
</tr>
}
@@ -137,6 +164,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
+ outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
jobId: Int): Seq[Node] = {
@@ -147,7 +175,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
- <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+ <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
+ {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
} else {
Nil
}
@@ -156,7 +185,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tr>
{prefixCells}
<td sorttable_customkey={jobId.toString}>
- {jobId.toString}
+ {if (jobId >= 0) jobId.toString else "-"}
</td>
<!-- Duration -->
<td>-</td>
@@ -170,7 +199,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
private def generateOutputOpIdRow(
- outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+ outputOpId: OutputOpId,
+ outputOpStatus: String,
+ sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
// We don't count the durations of dropped jobs
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
map(sparkJob => {
@@ -189,12 +220,32 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val description = generateOutputOpDescription(sparkJobs)
- generateJobRow(
- outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
- sparkJobs.tail.map { sparkJob =>
+ if (sparkJobs.isEmpty) {
+ generateOutputOpRowWithoutSparkJobs(
+ outputOpId, description, formattedOutputOpDuration, outputOpStatus)
+ } else {
+ val firstRow =
generateJobRow(
- outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
- }.flatMap(x => x)
+ outputOpId,
+ description,
+ formattedOutputOpDuration,
+ outputOpStatus,
+ sparkJobs.size,
+ true,
+ sparkJobs.head)
+ val tailRows =
+ sparkJobs.tail.map { sparkJob =>
+ generateJobRow(
+ outputOpId,
+ description,
+ formattedOutputOpDuration,
+ outputOpStatus,
+ sparkJobs.size,
+ false,
+ sparkJob)
+ }
+ (firstRow ++ tailRows).flatten
+ }
}
private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
@@ -228,7 +279,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
}
- private def failureReasonCell(failureReason: String): Seq[Node] = {
+ private def failureReasonCell(
+ failureReason: String,
+ rowspan: Int,
+ includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
@@ -237,6 +291,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
} else {
failureReason
})
+ val failureDetails =
+ if (isMultiline && !includeFirstLineInExpandDetails) {
+ // Skip the first line
+ failureReason.substring(failureReason.indexOf('\n') + 1)
+ } else {
+ failureReason
+ }
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
@@ -244,13 +305,20 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
+details
</span> ++
<div class="stacktrace-details collapsed">
- <pre>{failureReason}</pre>
+ <pre>{failureDetails}</pre>
</div>
// scalastyle:on
} else {
""
}
- <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
+
+ if (rowspan == 1) {
+ <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
+ } else {
+ <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
+ {failureReasonSummary}{details}
+ </td>
+ }
}
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
@@ -265,16 +333,31 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
* Generate the job table for the batch.
*/
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
- val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
- sortBy(_._1). // sorted by OutputOpId
+ val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
// sort SparkJobIds for each OutputOpId
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
+ val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId =>
+ val status = batchUIData.failureReason.get(outputOpId).map { failure =>
+ if (failure.startsWith("org.apache.spark.SparkException")) {
+ "Failed due to Spark job error\n" + failure
+ } else {
+ var nextLineIndex = failure.indexOf("\n")
+ if (nextLineIndex < 0) {
+ nextLineIndex = failure.size
+ }
+ val firstLine = failure.substring(0, nextLineIndex)
+ s"Failed due to error: $firstLine\n$failure"
+ }
+ }.getOrElse("Succeeded")
+ val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
+ (outputOpId, status, sparkJobIds)
+ }
sparkListener.synchronized {
- val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
- outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
- (outputOpId,
+ val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] =
+ outputOps.map { case (outputOpId, status, sparkJobIds) =>
+ (outputOpId, status,
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}
@@ -285,7 +368,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tbody>
{
outputOpIdWithJobs.map {
- case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
+ case (outputOpId, status, sparkJobIds) =>
+ generateOutputOpIdRow(outputOpId, status, sparkJobIds)
}
}
</tbody>
@@ -386,4 +470,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
}
+
+ private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = {
+ if (status == "Succeeded") {
+ <td rowspan={rowspan.toString}>Succeeded</td>
+ } else {
+ failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false)
+ }
+ }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index ae508c0e95..e6c2e2140c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -30,6 +30,8 @@ private[ui] case class BatchUIData(
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
+ val numOutputOp: Int,
+ val failureReason: Map[Int, String],
var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
/**
@@ -69,7 +71,9 @@ private[ui] object BatchUIData {
batchInfo.streamIdToInputInfo,
batchInfo.submissionTime,
batchInfo.processingStartTime,
- batchInfo.processingEndTime
+ batchInfo.processingEndTime,
+ batchInfo.numOutputOp,
+ batchInfo.failureReasons
)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 068a6cb0e8..d1df78871d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -121,7 +121,7 @@ class UISeleniumSuite
}
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",
- "Total Delay (?)")
+ "Total Delay (?)", "Output Ops: Succeeded/Total")
}
val batchLinks =
@@ -138,7 +138,7 @@ class UISeleniumSuite
summaryText should contain ("Total delay:")
findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be {
- List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
+ List("Output Op Id", "Description", "Duration", "Status", "Job Id", "Duration",
"Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error")
}