aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-29 18:22:14 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-29 18:22:14 -0700
commit1b7106b867bc0aa4d64b669d79b646f862acaf47 (patch)
treec2a6a4102cac6fa7db1dd8a96cea1c7d4bc02e7f /streaming
parent114bad606e7a17f980ea6c99e31c8ab0179fec2e (diff)
downloadspark-1b7106b867bc0aa4d64b669d79b646f862acaf47.tar.gz
spark-1b7106b867bc0aa4d64b669d79b646f862acaf47.tar.bz2
spark-1b7106b867bc0aa4d64b669d79b646f862acaf47.zip
[SPARK-6862] [STREAMING] [WEBUI] Add BatchPage to display details of a batch
This is an initial commit for SPARK-6862. Once SPARK-6796 is merged, I will add the links to StreamingPage so that the user can jump to BatchPage. Screenshots: ![success](https://cloud.githubusercontent.com/assets/1000778/7102439/bbe75406-e0b3-11e4-84fe-3e6de629a49a.png) ![failure](https://cloud.githubusercontent.com/assets/1000778/7102440/bc124454-e0b3-11e4-921a-c8b39d6b61bc.png) Author: zsxwing <zsxwing@gmail.com> Closes #5473 from zsxwing/SPARK-6862 and squashes the following commits: 0727d35 [zsxwing] Change BatchUIData to a case class b380cfb [zsxwing] Add createJobStart to eliminate duplicate codes 9a3083d [zsxwing] Rename XxxDatas -> XxxData 087ba98 [zsxwing] Refactor BatchInfo to store only necessary fields cb62e4f [zsxwing] Use Seq[(OutputOpId, SparkJobId)] to store the id relations 72f8e7e [zsxwing] Add unit tests for BatchPage 1282b10 [zsxwing] Handle some corner cases and add tests for StreamingJobProgressListener 77a69ae [zsxwing] Refactor codes as per TD's comments 35ffd80 [zsxwing] Merge branch 'master' into SPARK-6862 15bdf9b [zsxwing] Add batch links and unit tests 4bf66b6 [zsxwing] Merge branch 'master' into SPARK-6862 7168807 [zsxwing] Limit the max width of the error message and fix nits in the UI 0b226f9 [zsxwing] Change 'Last Error' to 'Error' fc98a43 [zsxwing] Put clearing local properties to finally and remove redundant private[streaming] 0c7b2eb [zsxwing] Add BatchPage to display details of a batch
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala44
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala28
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala26
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala264
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala75
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala161
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala83
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala100
11 files changed, 709 insertions, 80 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 24f99a2b92..83d41f5762 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -626,7 +626,7 @@ abstract class DStream[T: ClassTag] (
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
- if (firstNum.size > num) println("...")
+ if (firstNum.length > num) println("...")
println()
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 30cf87f5b7..3c481bf349 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -25,15 +25,49 @@ import scala.util.Try
*/
private[streaming]
class Job(val time: Time, func: () => _) {
- var id: String = _
- var result: Try[_] = null
+ private var _id: String = _
+ private var _outputOpId: Int = _
+ private var isSet = false
+ private var _result: Try[_] = null
def run() {
- result = Try(func())
+ _result = Try(func())
}
- def setId(number: Int) {
- id = "streaming job " + time + "." + number
+ def result: Try[_] = {
+ if (_result == null) {
+ throw new IllegalStateException("Cannot access result before job finishes")
+ }
+ _result
+ }
+
+ /**
+ * @return the global unique id of this Job.
+ */
+ def id: String = {
+ if (!isSet) {
+ throw new IllegalStateException("Cannot access id before calling setId")
+ }
+ _id
+ }
+
+ /**
+ * @return the output op id of this Job. Each Job has a unique output op id in the same JobSet.
+ */
+ def outputOpId: Int = {
+ if (!isSet) {
+ throw new IllegalStateException("Cannot access number before calling setId")
+ }
+ _outputOpId
+ }
+
+ def setOutputOpId(outputOpId: Int) {
+ if (isSet) {
+ throw new IllegalStateException("Cannot call setOutputOpId more than once")
+ }
+ isSet = true
+ _id = s"streaming job $time.$outputOpId"
+ _outputOpId = outputOpId
}
override def toString: String = id
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 508b89278d..c7a2c1141a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -172,16 +172,28 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
ssc.waiter.notifyError(e)
}
- private class JobHandler(job: Job) extends Runnable {
+ private class JobHandler(job: Job) extends Runnable with Logging {
def run() {
- eventLoop.post(JobStarted(job))
- // Disable checks for existing output directories in jobs launched by the streaming scheduler,
- // since we may need to write output to an existing directory during checkpoint recovery;
- // see SPARK-4835 for more details.
- PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
- job.run()
+ ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
+ ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
+ try {
+ eventLoop.post(JobStarted(job))
+ // Disable checks for existing output directories in jobs launched by the streaming
+ // scheduler, since we may need to write output to an existing directory during checkpoint
+ // recovery; see SPARK-4835 for more details.
+ PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ job.run()
+ }
+ eventLoop.post(JobCompleted(job))
+ } finally {
+ ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
+ ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
}
- eventLoop.post(JobCompleted(job))
}
}
}
+
+private[streaming] object JobScheduler {
+ val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime"
+ val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId"
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 5b134877d0..24b3794236 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -35,7 +35,7 @@ case class JobSet(
private var processingStartTime = -1L // when the first job of this jobset started processing
private var processingEndTime = -1L // when the last job of this jobset finished processing
- jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
+ jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }
incompleteJobs ++= jobs
def handleJobStart(job: Job) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index df1c0a1070..e219e27785 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.ui
import scala.xml.Node
-import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.ui.UIUtils
private[ui] abstract class BatchTableBase(tableId: String) {
@@ -31,18 +30,20 @@ private[ui] abstract class BatchTableBase(tableId: String) {
<th>Processing Time</th>
}
- protected def baseRow(batch: BatchInfo): Seq[Node] = {
+ protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
- val eventCount = batch.receivedBlockInfo.values.map {
- receivers => receivers.map(_.numRecords).sum
- }.sum
+ val eventCount = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-")
- <td sorttable_customkey={batchTime.toString}>{formattedBatchTime}</td>
+ <td sorttable_customkey={batchTime.toString}>
+ <a href={s"batch?id=$batchTime"}>
+ {formattedBatchTime}
+ </a>
+ </td>
<td sorttable_customkey={eventCount.toString}>{eventCount.toString} events</td>
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
{formattedSchedulingDelay}
@@ -73,8 +74,9 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def renderRows: Seq[Node]
}
-private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo])
- extends BatchTableBase("active-batches-table") {
+private[ui] class ActiveBatchTable(
+ runningBatches: Seq[BatchUIData],
+ waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") {
override protected def columns: Seq[Node] = super.columns ++ <th>Status</th>
@@ -85,16 +87,16 @@ private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatche
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
}
- private def runningBatchRow(batch: BatchInfo): Seq[Node] = {
+ private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ <td>processing</td>
}
- private def waitingBatchRow(batch: BatchInfo): Seq[Node] = {
+ private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ <td>queued</td>
}
}
-private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
+private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
extends BatchTableBase("completed-batches-table") {
override protected def columns: Seq[Node] = super.columns ++ <th>Total Delay</th>
@@ -103,7 +105,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
}
- private def completedBatchRow(batch: BatchInfo): Seq[Node] = {
+ private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-")
baseRow(batch) ++
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
new file mode 100644
index 0000000000..2da9a29e25
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{NodeSeq, Node}
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.streaming.Time
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+
+private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
+ private val streamingListener = parent.listener
+ private val sparkListener = parent.ssc.sc.jobProgressListener
+
+ private def columns: Seq[Node] = {
+ <th>Output Op Id</th>
+ <th>Description</th>
+ <th>Duration</th>
+ <th>Job Id</th>
+ <th>Duration</th>
+ <th class="sorttable_nosort">Stages: Succeeded/Total</th>
+ <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
+ <th>Error</th>
+ }
+
+ /**
+ * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
+ * one cell, we use "rowspan" for the first row of a output op.
+ */
+ def generateJobRow(
+ outputOpId: OutputOpId,
+ formattedOutputOpDuration: String,
+ numSparkJobRowsInOutputOp: Int,
+ isFirstRow: Boolean,
+ sparkJob: JobUIData): Seq[Node] = {
+ val lastStageInfo = Option(sparkJob.stageIds)
+ .filter(_.nonEmpty)
+ .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
+ val lastStageData = lastStageInfo.flatMap { s =>
+ sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+ }
+
+ val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+ val duration: Option[Long] = {
+ sparkJob.submissionTime.map { start =>
+ val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ }
+ val lastFailureReason =
+ sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
+ dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
+ flatMap(info => info.failureReason).headOption.getOrElse("")
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
+ val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
+
+ // In the first row, output op id and its information needs to be shown. In other rows, these
+ // cells will be taken up due to "rowspan".
+ // scalastyle:off
+ val prefixCells =
+ if (isFirstRow) {
+ <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+ <td rowspan={numSparkJobRowsInOutputOp.toString}>
+ <span class="description-input" title={lastStageDescription}>
+ {lastStageDescription}
+ </span>{lastStageName}
+ </td>
+ <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+ } else {
+ Nil
+ }
+ // scalastyle:on
+
+ <tr>
+ {prefixCells}
+ <td sorttable_customkey={sparkJob.jobId.toString}>
+ <a href={detailUrl}>
+ {sparkJob.jobId}{sparkJob.jobGroup.map(id => s"($id)").getOrElse("")}
+ </a>
+ </td>
+ <td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}>
+ {formattedDuration}
+ </td>
+ <td class="stage-progress-cell">
+ {sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages}
+ {if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"}
+ {if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"}
+ </td>
+ <td class="progress-cell">
+ {
+ UIUtils.makeProgressBar(
+ started = sparkJob.numActiveTasks,
+ completed = sparkJob.numCompletedTasks,
+ failed = sparkJob.numFailedTasks,
+ skipped = sparkJob.numSkippedTasks,
+ total = sparkJob.numTasks - sparkJob.numSkippedTasks)
+ }
+ </td>
+ {failureReasonCell(lastFailureReason)}
+ </tr>
+ }
+
+ private def generateOutputOpIdRow(
+ outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
+ val sparkjobDurations = sparkJobs.map(sparkJob => {
+ sparkJob.submissionTime.map { start =>
+ val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ })
+ val formattedOutputOpDuration =
+ if (sparkjobDurations.exists(_ == None)) {
+ // If any job does not finish, set "formattedOutputOpDuration" to "-"
+ "-"
+ } else {
+ UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+ }
+ generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
+ sparkJobs.tail.map { sparkJob =>
+ generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
+ }.flatMap(x => x)
+ }
+
+ private def failureReasonCell(failureReason: String): Seq[Node] = {
+ val isMultiline = failureReason.indexOf('\n') >= 0
+ // Display the first line by default
+ val failureReasonSummary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) {
+ failureReason.substring(0, failureReason.indexOf('\n'))
+ } else {
+ failureReason
+ })
+ val details = if (isMultiline) {
+ // scalastyle:off
+ <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+ class="expand-details">
+ +details
+ </span> ++
+ <div class="stacktrace-details collapsed">
+ <pre>{failureReason}</pre>
+ </div>
+ // scalastyle:on
+ } else {
+ ""
+ }
+ <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
+ }
+
+ private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
+ sparkListener.activeJobs.get(sparkJobId).orElse {
+ sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
+ sparkListener.failedJobs.find(_.jobId == sparkJobId)
+ }
+ }
+ }
+
+ /**
+ * Generate the job table for the batch.
+ */
+ private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
+ val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
+ sortBy(_._1). // sorted by OutputOpId
+ map { case (outputOpId, outputOpIdAndSparkJobIds) =>
+ // sort SparkJobIds for each OutputOpId
+ (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
+ }
+ sparkListener.synchronized {
+ val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
+ outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
+ // Filter out spark Job ids that don't exist in sparkListener
+ (outputOpId, sparkJobIds.flatMap(getJobData))
+ }
+
+ <table id="batch-job-table" class="table table-bordered table-striped table-condensed">
+ <thead>
+ {columns}
+ </thead>
+ <tbody>
+ {
+ outputOpIdWithJobs.map {
+ case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
+ }
+ }
+ </tbody>
+ </table>
+ }
+ }
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
+ throw new IllegalArgumentException(s"Missing id parameter")
+ }
+ val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
+
+ val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
+ throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
+ }
+
+ val formattedSchedulingDelay =
+ batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedProcessingTime =
+ batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
+
+ val summary: NodeSeq =
+ <div>
+ <ul class="unstyled">
+ <li>
+ <strong>Batch Duration: </strong>
+ {UIUtils.formatDuration(streamingListener.batchDuration)}
+ </li>
+ <li>
+ <strong>Input data size: </strong>
+ {batchUIData.numRecords} records
+ </li>
+ <li>
+ <strong>Scheduling delay: </strong>
+ {formattedSchedulingDelay}
+ </li>
+ <li>
+ <strong>Processing time: </strong>
+ {formattedProcessingTime}
+ </li>
+ <li>
+ <strong>Total delay: </strong>
+ {formattedTotalDelay}
+ </li>
+ </ul>
+ </div>
+
+ val jobTable =
+ if (batchUIData.outputOpIdSparkJobIdPairs.isEmpty) {
+ <div>Cannot find any job for Batch {formattedBatchTime}.</div>
+ } else {
+ generateJobTable(batchUIData)
+ }
+
+ val content = summary ++ jobTable
+
+ UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
new file mode 100644
index 0000000000..f45c291b7c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.ui.StreamingJobProgressListener._
+
+private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId)
+
+private[ui] case class BatchUIData(
+ val batchTime: Time,
+ val receiverNumRecords: Map[Int, Long],
+ val submissionTime: Long,
+ val processingStartTime: Option[Long],
+ val processingEndTime: Option[Long],
+ var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
+
+ /**
+ * Time taken for the first job of this batch to start processing from the time this batch
+ * was submitted to the streaming scheduler. Essentially, it is
+ * `processingStartTime` - `submissionTime`.
+ */
+ def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)
+
+ /**
+ * Time taken for the all jobs of this batch to finish processing from the time they started
+ * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+ */
+ def processingDelay: Option[Long] = {
+ for (start <- processingStartTime;
+ end <- processingEndTime)
+ yield end - start
+ }
+
+ /**
+ * Time taken for all the jobs of this batch to finish processing from the time they
+ * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
+ */
+ def totalDelay: Option[Long] = processingEndTime.map(_ - submissionTime)
+
+ /**
+ * The number of recorders received by the receivers in this batch.
+ */
+ def numRecords: Long = receiverNumRecords.map(_._2).sum
+}
+
+private[ui] object BatchUIData {
+
+ def apply(batchInfo: BatchInfo): BatchUIData = {
+ new BatchUIData(
+ batchInfo.batchTime,
+ batchInfo.receivedBlockInfo.mapValues(_.map(_.numRecords).sum),
+ batchInfo.submissionTime,
+ batchInfo.processingStartTime,
+ batchInfo.processingEndTime
+ )
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index be1e8686cf..34b55717a1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -17,29 +17,58 @@
package org.apache.spark.streaming.ui
-import scala.collection.mutable.{Queue, HashMap}
+import java.util.LinkedHashMap
+import java.util.{Map => JMap}
+import java.util.Properties
+import scala.collection.mutable.{ArrayBuffer, Queue, HashMap, SynchronizedBuffer}
+
+import org.apache.spark.scheduler._
import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
-import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
- extends StreamingListener {
+ extends StreamingListener with SparkListener {
- private val waitingBatchInfos = new HashMap[Time, BatchInfo]
- private val runningBatchInfos = new HashMap[Time, BatchInfo]
- private val completedBatchInfos = new Queue[BatchInfo]
- private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ private val waitingBatchUIData = new HashMap[Time, BatchUIData]
+ private val runningBatchUIData = new HashMap[Time, BatchUIData]
+ private val completedBatchUIData = new Queue[BatchUIData]
+ private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
+ // Because onJobStart and onBatchXXX messages are processed in different threads,
+ // we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
+ // cannot use a map of (Time, BatchUIData).
+ private[ui] val batchTimeToOutputOpIdSparkJobIdPair =
+ new LinkedHashMap[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]] {
+ override def removeEldestEntry(
+ p1: JMap.Entry[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]]): Boolean = {
+ // If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
+ // SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
+ // may add some information for a removed batch when processing "onJobStart". It will be a
+ // memory leak.
+ //
+ // To avoid the memory leak, we control the size of "batchTimeToOutputOpIdToSparkJobIds" and
+ // evict the eldest one.
+ //
+ // Note: if "onJobStart" happens before "onBatchSubmitted", the size of
+ // "batchTimeToOutputOpIdToSparkJobIds" may be greater than the number of the retained
+ // batches temporarily, so here we use "10" to handle such case. This is not a perfect
+ // solution, but at least it can handle most of cases.
+ size() >
+ waitingBatchUIData.size + runningBatchUIData.size + completedBatchUIData.size + 10
+ }
+ }
+
+
val batchDuration = ssc.graph.batchDuration.milliseconds
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
@@ -62,37 +91,62 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
synchronized {
- waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+ waitingBatchUIData(batchSubmitted.batchInfo.batchTime) =
+ BatchUIData(batchSubmitted.batchInfo)
}
}
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized {
- runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
- waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+ val batchUIData = BatchUIData(batchStarted.batchInfo)
+ runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo)
+ waitingBatchUIData.remove(batchStarted.batchInfo.batchTime)
- batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
- totalReceivedRecords += infos.map(_.numRecords).sum
- }
+ totalReceivedRecords += batchUIData.numRecords
}
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
synchronized {
- waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
- runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
- completedBatchInfos.enqueue(batchCompleted.batchInfo)
- if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue()
+ waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
+ runningBatchUIData.remove(batchCompleted.batchInfo.batchTime)
+ val batchUIData = BatchUIData(batchCompleted.batchInfo)
+ completedBatchUIData.enqueue(batchUIData)
+ if (completedBatchUIData.size > batchUIDataLimit) {
+ val removedBatch = completedBatchUIData.dequeue()
+ batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
+ }
totalCompletedBatches += 1L
- batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
- totalProcessedRecords += infos.map(_.numRecords).sum
+ totalProcessedRecords += batchUIData.numRecords
+ }
+ }
+
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
+ getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
+ var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)
+ if (outputOpIdToSparkJobIds == null) {
+ outputOpIdToSparkJobIds =
+ new ArrayBuffer[OutputOpIdAndSparkJobId]()
+ with SynchronizedBuffer[OutputOpIdAndSparkJobId]
+ batchTimeToOutputOpIdSparkJobIdPair.put(batchTime, outputOpIdToSparkJobIds)
}
+ outputOpIdToSparkJobIds += OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId)
}
}
- def numReceivers: Int = synchronized {
- ssc.graph.getReceiverInputStreams().size
+ private def getBatchTimeAndOutputOpId(properties: Properties): Option[(Time, Int)] = {
+ val batchTime = properties.getProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY)
+ if (batchTime == null) {
+ // Not submitted from JobScheduler
+ None
+ } else {
+ val outputOpId = properties.getProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY)
+ assert(outputOpId != null)
+ Some(Time(batchTime.toLong) -> outputOpId.toInt)
+ }
}
+ def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
+
def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
}
@@ -106,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def numUnprocessedBatches: Long = synchronized {
- waitingBatchInfos.size + runningBatchInfos.size
+ waitingBatchUIData.size + runningBatchUIData.size
}
- def waitingBatches: Seq[BatchInfo] = synchronized {
- waitingBatchInfos.values.toSeq
+ def waitingBatches: Seq[BatchUIData] = synchronized {
+ waitingBatchUIData.values.toSeq
}
- def runningBatches: Seq[BatchInfo] = synchronized {
- runningBatchInfos.values.toSeq
+ def runningBatches: Seq[BatchUIData] = synchronized {
+ runningBatchUIData.values.toSeq
}
- def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
- completedBatchInfos.toSeq
+ def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
+ completedBatchUIData.toSeq
}
def processingDelayDistribution: Option[Distribution] = synchronized {
@@ -134,15 +188,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
- val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
- val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
+ val latestBatches = retainedBatches.reverse.take(batchUIDataLimit)
(0 until numReceivers).map { receiverId =>
- val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
- batchInfo.get(receiverId).getOrElse(Array.empty)
- }
- val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
- // calculate records per second for each batch
- blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
+ val recordsOfParticularReceiver = latestBatches.map { batch =>
+ // calculate records per second for each batch
+ batch.receiverNumRecords.get(receiverId).sum.toDouble * 1000 / batchDuration
}
val distributionOption = Distribution(recordsOfParticularReceiver)
(receiverId, distributionOption)
@@ -150,10 +200,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
- val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
+ val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receiverNumRecords)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numReceivers).map { receiverId =>
- (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
+ (receiverId, lastReceivedBlockInfo.getOrElse(receiverId, 0L))
}.toMap
}.getOrElse {
(0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap
@@ -164,20 +214,39 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
receiverInfos.get(receiverId)
}
- def lastCompletedBatch: Option[BatchInfo] = synchronized {
- completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+ def lastCompletedBatch: Option[BatchUIData] = synchronized {
+ completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption
}
- def lastReceivedBatch: Option[BatchInfo] = synchronized {
+ def lastReceivedBatch: Option[BatchUIData] = synchronized {
retainedBatches.lastOption
}
- private def retainedBatches: Seq[BatchInfo] = {
- (waitingBatchInfos.values.toSeq ++
- runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
+ private def retainedBatches: Seq[BatchUIData] = {
+ (waitingBatchUIData.values.toSeq ++
+ runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering)
+ }
+
+ private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = {
+ Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble))
}
- private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
- Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+ def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
+ val batchUIData = waitingBatchUIData.get(batchTime).orElse {
+ runningBatchUIData.get(batchTime).orElse {
+ completedBatchUIData.find(batch => batch.batchTime == batchTime)
+ }
+ }
+ batchUIData.foreach { _batchUIData =>
+ val outputOpIdToSparkJobIds =
+ Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)).getOrElse(Seq.empty)
+ _batchUIData.outputOpIdSparkJobIdPairs = outputOpIdToSparkJobIds
+ }
+ batchUIData
}
}
+
+private[streaming] object StreamingJobProgressListener {
+ type SparkJobId = Int
+ type OutputOpId = Int
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 9a860ea4a6..e4039639ad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -27,14 +27,16 @@ import StreamingTab._
* Spark Web UI tab that shows statistics of a streaming job.
* This assumes the given SparkContext has enabled its SparkUI.
*/
-private[spark] class StreamingTab(ssc: StreamingContext)
+private[spark] class StreamingTab(val ssc: StreamingContext)
extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
val parent = getSparkUI(ssc)
val listener = ssc.progressListener
ssc.addStreamingListener(listener)
+ ssc.sc.addSparkListener(listener)
attachPage(new StreamingPage(this))
+ attachPage(new BatchPage(this))
parent.attachTab(this)
def detach() {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 205ddf6dbe..8de43baabc 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming
+import scala.collection.mutable.Queue
+
import org.openqa.selenium.WebDriver
import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.scalatest._
@@ -60,8 +62,28 @@ class UISeleniumSuite
ssc
}
+ private def setupStreams(ssc: StreamingContext): Unit = {
+ val rdds = Queue(ssc.sc.parallelize(1 to 4, 4))
+ val inputStream = ssc.queueStream(rdds)
+ inputStream.foreachRDD { rdd =>
+ rdd.foreach(_ => {})
+ rdd.foreach(_ => {})
+ }
+ inputStream.foreachRDD { rdd =>
+ rdd.foreach(_ => {})
+ try {
+ rdd.foreach(_ => throw new RuntimeException("Oops"))
+ } catch {
+ case e: SparkException if e.getMessage.contains("Oops") =>
+ }
+ }
+ }
+
test("attaching and detaching a Streaming tab") {
withStreamingContext(newSparkStreamingContext()) { ssc =>
+ setupStreams(ssc)
+ ssc.start()
+
val sparkUI = ssc.sparkContext.ui.get
eventually(timeout(10 seconds), interval(50 milliseconds)) {
@@ -77,8 +99,8 @@ class UISeleniumSuite
statisticText should contain("Batch interval:")
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
- h4Text should contain("Active Batches (0)")
- h4Text should contain("Completed Batches (last 0 out of 0)")
+ h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
+ h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status")
@@ -86,6 +108,63 @@ class UISeleniumSuite
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay")
}
+
+ val batchLinks =
+ findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq
+ batchLinks.size should be >= 1
+
+ // Check a normal batch page
+ go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
+ val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
+ summaryText should contain ("Batch Duration:")
+ summaryText should contain ("Input data size:")
+ summaryText should contain ("Scheduling delay:")
+ summaryText should contain ("Processing time:")
+ summaryText should contain ("Total delay:")
+
+ findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be {
+ List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
+ "Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error")
+ }
+
+ // Check we have 2 output op ids
+ val outputOpIds = findAll(cssSelector(".output-op-id-cell")).toSeq
+ outputOpIds.map(_.attribute("rowspan")) should be (List(Some("2"), Some("2")))
+ outputOpIds.map(_.text) should be (List("0", "1"))
+
+ // Check job ids
+ val jobIdCells = findAll(cssSelector( """#batch-job-table a""")).toSeq
+ jobIdCells.map(_.text) should be (List("0", "1", "2", "3"))
+
+ val jobLinks = jobIdCells.flatMap(_.attribute("href"))
+ jobLinks.size should be (4)
+
+ // Check stage progress
+ findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toSeq should be
+ (List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
+
+ // Check job progress
+ findAll(cssSelector(""".progress-cell""")).map(_.text).toSeq should be
+ (List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
+
+ // Check stacktrace
+ val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.text).toSeq
+ errorCells should have size 1
+ errorCells(0) should include("java.lang.RuntimeException: Oops")
+
+ // Check the job link in the batch page is right
+ go to (jobLinks(0))
+ val jobDetails = findAll(cssSelector("li strong")).map(_.text).toSeq
+ jobDetails should contain("Status:")
+ jobDetails should contain("Completed Stages:")
+
+ // Check a batch page without id
+ go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/")
+ webDriver.getPageSource should include ("Missing id parameter")
+
+ // Check a non-exist batch
+ go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/?id=12345")
+ webDriver.getPageSource should include ("does not exist")
}
ssc.stop(false)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 94b1985116..fa89536de4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -17,8 +17,11 @@
package org.apache.spark.streaming.ui
+import java.util.Properties
+
import org.scalatest.Matchers
+import org.apache.spark.scheduler.SparkListenerJobStart
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.{Duration, Time, Milliseconds, TestSuiteBase}
@@ -28,6 +31,17 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val input = (1 to 4).map(Seq(_)).toSeq
val operation = (d: DStream[Int]) => d.map(x => x)
+ private def createJobStart(
+ batchTime: Time, outputOpId: Int, jobId: Int): SparkListenerJobStart = {
+ val properties = new Properties()
+ properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, batchTime.milliseconds.toString)
+ properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, outputOpId.toString)
+ SparkListenerJobStart(jobId = jobId,
+ 0L, // unused
+ Nil, // unused
+ properties)
+ }
+
override def batchDuration: Duration = Milliseconds(100)
test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
@@ -43,7 +57,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
// onBatchSubmitted
val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
- listener.waitingBatches should be (List(batchInfoSubmitted))
+ listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
listener.runningBatches should be (Nil)
listener.retainedCompletedBatches should be (Nil)
listener.lastCompletedBatch should be (None)
@@ -56,7 +70,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
listener.waitingBatches should be (Nil)
- listener.runningBatches should be (List(batchInfoStarted))
+ listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
listener.retainedCompletedBatches should be (Nil)
listener.lastCompletedBatch should be (None)
listener.numUnprocessedBatches should be (1)
@@ -64,13 +78,40 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalProcessedRecords should be (0)
listener.numTotalReceivedRecords should be (600)
+ // onJobStart
+ val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0)
+ listener.onJobStart(jobStart1)
+
+ val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1)
+ listener.onJobStart(jobStart2)
+
+ val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0)
+ listener.onJobStart(jobStart3)
+
+ val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1)
+ listener.onJobStart(jobStart4)
+
+ val batchUIData = listener.getBatchUIData(Time(1000))
+ batchUIData should not be None
+ batchUIData.get.batchTime should be (batchInfoStarted.batchTime)
+ batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
+ batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
+ batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
+ batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L))
+ batchUIData.get.numRecords should be(600)
+ batchUIData.get.outputOpIdSparkJobIdPairs should be
+ Seq(OutputOpIdAndSparkJobId(0, 0),
+ OutputOpIdAndSparkJobId(0, 1),
+ OutputOpIdAndSparkJobId(1, 0),
+ OutputOpIdAndSparkJobId(1, 1))
+
// onBatchCompleted
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (Nil)
- listener.retainedCompletedBatches should be (List(batchInfoCompleted))
- listener.lastCompletedBatch should be (Some(batchInfoCompleted))
+ listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted)))
+ listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted)))
listener.numUnprocessedBatches should be (0)
listener.numTotalCompletedBatches should be (1)
listener.numTotalProcessedRecords should be (600)
@@ -116,4 +157,55 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.retainedCompletedBatches.size should be (limit)
listener.numTotalCompletedBatches should be(limit + 10)
}
+
+ test("out-of-order onJobStart and onBatchXXX") {
+ val ssc = setupStreams(input, operation)
+ val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ val listener = new StreamingJobProgressListener(ssc)
+
+ // fulfill completedBatchInfos
+ for(i <- 0 until limit) {
+ val batchInfoCompleted =
+ BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
+ listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+ val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
+ listener.onJobStart(jobStart)
+ }
+
+ // onJobStart happens before onBatchSubmitted
+ val jobStart = createJobStart(Time(1000 + limit * 100), outputOpId = 0, jobId = 0)
+ listener.onJobStart(jobStart)
+
+ val batchInfoSubmitted =
+ BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None)
+ listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
+
+ // We still can see the info retrieved from onJobStart
+ val batchUIData = listener.getBatchUIData(Time(1000 + limit * 100))
+ batchUIData should not be None
+ batchUIData.get.batchTime should be (batchInfoSubmitted.batchTime)
+ batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
+ batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
+ batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
+ batchUIData.get.receiverNumRecords should be (Map.empty)
+ batchUIData.get.numRecords should be (0)
+ batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
+
+ // A lot of "onBatchCompleted"s happen before "onJobStart"
+ for(i <- limit + 1 to limit * 2) {
+ val batchInfoCompleted =
+ BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
+ listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+ }
+
+ for(i <- limit + 1 to limit * 2) {
+ val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
+ listener.onJobStart(jobStart)
+ }
+
+ // We should not leak memory
+ listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <=
+ (listener.waitingBatches.size + listener.runningBatches.size +
+ listener.retainedCompletedBatches.size + 10)
+ }
}