aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala134
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala4
6 files changed, 143 insertions, 27 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 3c869561ef..463f899dc2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -41,6 +41,8 @@ case class BatchInfo(
private var _failureReasons: Map[Int, String] = Map.empty
+ private var _numOutputOp: Int = 0
+
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
@@ -77,4 +79,12 @@ case class BatchInfo(
/** Failure reasons corresponding to every output ops in the batch */
private[streaming] def failureReasons = _failureReasons
+
+ /** Set the number of output operations in this batch */
+ private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = {
+ _numOutputOp = numOutputOp
+ }
+
+ /** Return the number of output operations in this batch */
+ private[streaming] def numOutputOp: Int = _numOutputOp
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 255ccf0536..08f63cc992 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -81,6 +81,7 @@ case class JobSet(
if (processingEndTime >= 0) Some(processingEndTime) else None
)
binfo.setFailureReason(failureReasons)
+ binfo.setNumOutputOp(jobs.size)
binfo
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index f702bd5bc9..3e6590d66f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -107,9 +107,10 @@ private[ui] class ActiveBatchTable(
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {
- override protected def columns: Seq[Node] = super.columns ++
- <th>Total Delay
- {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
+ override protected def columns: Seq[Node] = super.columns ++ {
+ <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
+ <th>Output Ops: Succeeded/Total</th>
+ }
override protected def renderRows: Seq[Node] = {
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
@@ -118,9 +119,17 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+ val numFailedOutputOp = batch.failureReason.size
+ val outputOpColumn = if (numFailedOutputOp > 0) {
+ s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" +
+ s" (${numFailedOutputOp} failed)"
+ } else {
+ s"${batch.numOutputOp}/${batch.numOutputOp}"
+ }
baseRow(batch) ++
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
+ <td>{outputOpColumn}</td>
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 9129c1f26a..1b717b6454 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -38,6 +38,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Output Op Id</th>
<th>Description</th>
<th>Duration</th>
+ <th>Status</th>
<th>Job Id</th>
<th>Duration</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
@@ -49,18 +50,42 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
+ outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) {
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
- numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+ outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
} else {
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
- numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+ outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
}
}
+ private def generateOutputOpRowWithoutSparkJobs(
+ outputOpId: OutputOpId,
+ outputOpDescription: Seq[Node],
+ formattedOutputOpDuration: String,
+ outputOpStatus: String): Seq[Node] = {
+ <tr>
+ <td class="output-op-id-cell" >{outputOpId.toString}</td>
+ <td>{outputOpDescription}</td>
+ <td>{formattedOutputOpDuration}</td>
+ {outputOpStatusCell(outputOpStatus, rowspan = 1)}
+ <!-- Job Id -->
+ <td>-</td>
+ <!-- Duration -->
+ <td>-</td>
+ <!-- Stages: Succeeded/Total -->
+ <td>-</td>
+ <!-- Tasks (for all stages): Succeeded/Total -->
+ <td>-</td>
+ <!-- Error -->
+ <td>-</td>
+ </tr>
+ }
+
/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
@@ -69,6 +94,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
+ outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
@@ -94,7 +120,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<td rowspan={numSparkJobRowsInOutputOp.toString}>
{outputOpDescription}
</td>
- <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+ <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
+ {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
} else {
Nil
}
@@ -125,7 +152,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
</td>
- {failureReasonCell(lastFailureReason)}
+ {failureReasonCell(lastFailureReason, rowspan = 1)}
</tr>
}
@@ -137,6 +164,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
+ outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
jobId: Int): Seq[Node] = {
@@ -147,7 +175,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
- <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+ <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
+ {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
} else {
Nil
}
@@ -156,7 +185,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tr>
{prefixCells}
<td sorttable_customkey={jobId.toString}>
- {jobId.toString}
+ {if (jobId >= 0) jobId.toString else "-"}
</td>
<!-- Duration -->
<td>-</td>
@@ -170,7 +199,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
private def generateOutputOpIdRow(
- outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+ outputOpId: OutputOpId,
+ outputOpStatus: String,
+ sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
// We don't count the durations of dropped jobs
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
map(sparkJob => {
@@ -189,12 +220,32 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val description = generateOutputOpDescription(sparkJobs)
- generateJobRow(
- outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
- sparkJobs.tail.map { sparkJob =>
+ if (sparkJobs.isEmpty) {
+ generateOutputOpRowWithoutSparkJobs(
+ outputOpId, description, formattedOutputOpDuration, outputOpStatus)
+ } else {
+ val firstRow =
generateJobRow(
- outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
- }.flatMap(x => x)
+ outputOpId,
+ description,
+ formattedOutputOpDuration,
+ outputOpStatus,
+ sparkJobs.size,
+ true,
+ sparkJobs.head)
+ val tailRows =
+ sparkJobs.tail.map { sparkJob =>
+ generateJobRow(
+ outputOpId,
+ description,
+ formattedOutputOpDuration,
+ outputOpStatus,
+ sparkJobs.size,
+ false,
+ sparkJob)
+ }
+ (firstRow ++ tailRows).flatten
+ }
}
private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
@@ -228,7 +279,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
}
- private def failureReasonCell(failureReason: String): Seq[Node] = {
+ private def failureReasonCell(
+ failureReason: String,
+ rowspan: Int,
+ includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
@@ -237,6 +291,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
} else {
failureReason
})
+ val failureDetails =
+ if (isMultiline && !includeFirstLineInExpandDetails) {
+ // Skip the first line
+ failureReason.substring(failureReason.indexOf('\n') + 1)
+ } else {
+ failureReason
+ }
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
@@ -244,13 +305,20 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
+details
</span> ++
<div class="stacktrace-details collapsed">
- <pre>{failureReason}</pre>
+ <pre>{failureDetails}</pre>
</div>
// scalastyle:on
} else {
""
}
- <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
+
+ if (rowspan == 1) {
+ <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
+ } else {
+ <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
+ {failureReasonSummary}{details}
+ </td>
+ }
}
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
@@ -265,16 +333,31 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
* Generate the job table for the batch.
*/
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
- val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
- sortBy(_._1). // sorted by OutputOpId
+ val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
// sort SparkJobIds for each OutputOpId
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
+ val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId =>
+ val status = batchUIData.failureReason.get(outputOpId).map { failure =>
+ if (failure.startsWith("org.apache.spark.SparkException")) {
+ "Failed due to Spark job error\n" + failure
+ } else {
+ var nextLineIndex = failure.indexOf("\n")
+ if (nextLineIndex < 0) {
+ nextLineIndex = failure.size
+ }
+ val firstLine = failure.substring(0, nextLineIndex)
+ s"Failed due to error: $firstLine\n$failure"
+ }
+ }.getOrElse("Succeeded")
+ val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
+ (outputOpId, status, sparkJobIds)
+ }
sparkListener.synchronized {
- val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
- outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
- (outputOpId,
+ val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] =
+ outputOps.map { case (outputOpId, status, sparkJobIds) =>
+ (outputOpId, status,
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}
@@ -285,7 +368,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tbody>
{
outputOpIdWithJobs.map {
- case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
+ case (outputOpId, status, sparkJobIds) =>
+ generateOutputOpIdRow(outputOpId, status, sparkJobIds)
}
}
</tbody>
@@ -386,4 +470,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
}
+
+ private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = {
+ if (status == "Succeeded") {
+ <td rowspan={rowspan.toString}>Succeeded</td>
+ } else {
+ failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false)
+ }
+ }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index ae508c0e95..e6c2e2140c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -30,6 +30,8 @@ private[ui] case class BatchUIData(
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
+ val numOutputOp: Int,
+ val failureReason: Map[Int, String],
var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
/**
@@ -69,7 +71,9 @@ private[ui] object BatchUIData {
batchInfo.streamIdToInputInfo,
batchInfo.submissionTime,
batchInfo.processingStartTime,
- batchInfo.processingEndTime
+ batchInfo.processingEndTime,
+ batchInfo.numOutputOp,
+ batchInfo.failureReasons
)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 068a6cb0e8..d1df78871d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -121,7 +121,7 @@ class UISeleniumSuite
}
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",
- "Total Delay (?)")
+ "Total Delay (?)", "Output Ops: Succeeded/Total")
}
val batchLinks =
@@ -138,7 +138,7 @@ class UISeleniumSuite
summaryText should contain ("Total delay:")
findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be {
- List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
+ List("Output Op Id", "Description", "Duration", "Status", "Job Id", "Duration",
"Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error")
}