aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-07 17:34:44 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-07 17:34:44 -0700
commit22ab70e06ede65ca865073fe36c859042a920aa3 (patch)
tree95364012260106bd09458fdc4632c0dc64951826 /streaming
parent88063c62689135da56ee6678b2e826b88c382732 (diff)
downloadspark-22ab70e06ede65ca865073fe36c859042a920aa3.tar.gz
spark-22ab70e06ede65ca865073fe36c859042a920aa3.tar.bz2
spark-22ab70e06ede65ca865073fe36c859042a920aa3.zip
[SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly information when jobs are dropped by SparkListener
If jobs are dropped by SparkListener, at least we can show the job ids in BatchPage. Screenshot: ![b1](https://cloud.githubusercontent.com/assets/1000778/7434968/f19aa784-eff3-11e4-8f86-36a073873574.png) Author: zsxwing <zsxwing@gmail.com> Closes #5840 from zsxwing/SPARK-7305 and squashes the following commits: aca0ba6 [zsxwing] Fix the code style 718765e [zsxwing] Make generateNormalJobRow private 8073b03 [zsxwing] Merge branch 'master' into SPARK-7305 83dec11 [zsxwing] Make BatchPage show friendly information when jobs are dropped by SparkListener
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala136
1 files changed, 106 insertions, 30 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 3f1cab6906..831f60e870 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
import javax.servlet.http.HttpServletRequest
-import scala.xml.{NodeSeq, Node}
+import scala.xml.{NodeSeq, Node, Text}
import org.apache.commons.lang3.StringEscapeUtils
@@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData
+private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])
private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
@@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Error</th>
}
+ private def generateJobRow(
+ outputOpId: OutputOpId,
+ outputOpDescription: Seq[Node],
+ formattedOutputOpDuration: String,
+ numSparkJobRowsInOutputOp: Int,
+ isFirstRow: Boolean,
+ sparkJob: SparkJobIdWithUIData): Seq[Node] = {
+ if (sparkJob.jobUIData.isDefined) {
+ generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
+ numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+ } else {
+ generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
+ numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+ }
+ }
+
/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
*/
- def generateJobRow(
+ private def generateNormalJobRow(
outputOpId: OutputOpId,
+ outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
- val lastStageInfo = Option(sparkJob.stageIds)
- .filter(_.nonEmpty)
- .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
- val lastStageData = lastStageInfo.flatMap { s =>
- sparkListener.stageIdToData.get((s.stageId, s.attemptId))
- }
-
- val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
- val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
@@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>
- <span class="description-input" title={lastStageDescription}>
- {lastStageDescription}
- </span>{lastStageName}
+ {outputOpDescription}
</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
@@ -122,27 +129,96 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</tr>
}
- private def generateOutputOpIdRow(
- outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
- val sparkjobDurations = sparkJobs.map(sparkJob => {
- sparkJob.submissionTime.map { start =>
- val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
- end - start
+ /**
+ * If a job is dropped by sparkListener due to exceeding the limitation, we only show the job id
+ * with "-" cells.
+ */
+ private def generateDroppedJobRow(
+ outputOpId: OutputOpId,
+ outputOpDescription: Seq[Node],
+ formattedOutputOpDuration: String,
+ numSparkJobRowsInOutputOp: Int,
+ isFirstRow: Boolean,
+ jobId: Int): Seq[Node] = {
+ // In the first row, output op id and its information needs to be shown. In other rows, these
+ // cells will be taken up due to "rowspan".
+ // scalastyle:off
+ val prefixCells =
+ if (isFirstRow) {
+ <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+ <td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
+ <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+ } else {
+ Nil
}
- })
+ // scalastyle:on
+
+ <tr>
+ {prefixCells}
+ <td sorttable_customkey={jobId.toString}>
+ {jobId.toString}
+ </td>
+ <!-- Duration -->
+ <td>-</td>
+ <!-- Stages: Succeeded/Total -->
+ <td>-</td>
+ <!-- Tasks (for all stages): Succeeded/Total -->
+ <td>-</td>
+ <!-- Error -->
+ <td>-</td>
+ </tr>
+ }
+
+ private def generateOutputOpIdRow(
+ outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+ // We don't count the durations of dropped jobs
+ val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
+ map(sparkJob => {
+ sparkJob.submissionTime.map { start =>
+ val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ })
val formattedOutputOpDuration =
- if (sparkjobDurations.exists(_ == None)) {
- // If any job does not finish, set "formattedOutputOpDuration" to "-"
+ if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
+ // If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
- SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+ SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
}
- generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
+
+ val description = generateOutputOpDescription(sparkJobs)
+
+ generateJobRow(
+ outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
- generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
+ generateJobRow(
+ outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
}
+ private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+ val lastStageInfo =
+ sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
+ flatMap { sparkJob => // For the first job, get the latest Stage info
+ if (sparkJob.stageIds.isEmpty) {
+ None
+ } else {
+ sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
+ }
+ }
+ val lastStageData = lastStageInfo.flatMap { s =>
+ sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+ }
+
+ val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+
+ <span class="description-input" title={lastStageDescription}>
+ {lastStageDescription}
+ </span> ++ Text(lastStageName)
+ }
+
private def failureReasonCell(failureReason: String): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
@@ -187,10 +263,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
sparkListener.synchronized {
- val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
+ val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
- // Filter out spark Job ids that don't exist in sparkListener
- (outputOpId, sparkJobIds.flatMap(getJobData))
+ (outputOpId,
+ sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}
<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
@@ -200,7 +276,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tbody>
{
outputOpIdWithJobs.map {
- case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
+ case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
}
}
</tbody>