diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-09-22 22:44:09 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-09-22 22:44:09 -0700 |
commit | 5548a254755bb84edae2768b94ab1816e1b49b91 (patch) | |
tree | 4f126c853f9b93a3b24fcc617a1c7c422cd7dc90 /streaming | |
parent | 558e9c7e60a7c0d85ba26634e97562ad2163e91d (diff) | |
download | spark-5548a254755bb84edae2768b94ab1816e1b49b91.tar.gz spark-5548a254755bb84edae2768b94ab1816e1b49b91.tar.bz2 spark-5548a254755bb84edae2768b94ab1816e1b49b91.zip |
[SPARK-10652] [SPARK-10742] [STREAMING] Set meaningful job descriptions for all streaming jobs
Here is the screenshot after adding the job descriptions to threads that run receivers and the scheduler thread running the batch jobs.
## All jobs page
* Added job descriptions with links to relevant batch details page
![image](https://cloud.githubusercontent.com/assets/663212/9924165/cda4a372-5cb1-11e5-91ca-d43a32c699e9.png)
## All stages page
* Added stage descriptions with links to relevant batch details page
![image](https://cloud.githubusercontent.com/assets/663212/9923814/2cce266a-5cae-11e5-8a3f-dad84d06c50e.png)
## Streaming batch details page
* Added the +details link
![image](https://cloud.githubusercontent.com/assets/663212/9921977/24014a32-5c98-11e5-958e-457b6c38065b.png)
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #8791 from tdas/SPARK-10652.
Diffstat (limited to 'streaming')
5 files changed, 42 insertions, 17 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6720ba4f72..94fea63f55 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -200,6 +200,8 @@ class StreamingContext private[streaming] ( private val startSite = new AtomicReference[CallSite](null) + private[streaming] def getStartSite(): CallSite = startSite.get() + private var shutdownHookRef: AnyRef = _ conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint) @@ -744,7 +746,7 @@ object StreamingContext extends Logging { throw new IllegalStateException( "Only one StreamingContext may be started in this JVM. " + "Currently running StreamingContext was started at" + - activeContext.get.startSite.get.longForm) + activeContext.get.getStartSite().longForm) } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 0cd39594ee..32d995dc42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -25,6 +25,7 @@ import scala.util.{Failure, Success} import org.apache.spark.Logging import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ +import org.apache.spark.streaming.ui.UIUtils import org.apache.spark.util.{EventLoop, ThreadUtils} @@ -190,10 +191,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } private class JobHandler(job: Job) extends Runnable with Logging { + import JobScheduler._ + def run() { - ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) - ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) try { + val formattedTime = UIUtils.formatBatchTime( + job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) + val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" + val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" + + ssc.sc.setJobDescription( + s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""") + ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) + ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) + // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index f86fd44b48..204e6142fd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rpc._ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver._ -import org.apache.spark.util.{ThreadUtils, SerializableConfiguration} +import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration} /** Enumeration to identify current state of a Receiver */ @@ -554,6 +554,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors)) } receiverRDD.setName(s"Receiver $receiverId") + ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") + ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) + val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 90d1b0fade..9129c1f26a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -19,14 +19,14 @@ package org.apache.spark.streaming.ui import javax.servlet.http.HttpServletRequest -import scala.xml.{NodeSeq, Node, Text, Unparsed} +import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.streaming.Time -import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} -import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId} +import org.apache.spark.streaming.ui.StreamingJobProgressListener.{OutputOpId, SparkJobId} import org.apache.spark.ui.jobs.UIData.JobUIData +import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData]) @@ -207,16 +207,25 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { sparkListener.stageIdToInfo.get(sparkJob.stageIds.max) } } - val lastStageData = lastStageInfo.flatMap { s => - sparkListener.stageIdToData.get((s.stageId, s.attemptId)) - } - - val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") - val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("") + lastStageInfo match { + case Some(stageInfo) => + val details = if (stageInfo.details.nonEmpty) { + <span + onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')" + class="expand-details"> + +details + </span> ++ + <div class="stage-details collapsed"> + <pre>{stageInfo.details}</pre> + </div> + } else { + NodeSeq.Empty + } - <span class="description-input" title={lastStageDescription}> - {lastStageDescription} - </span> ++ Text(lastStageName) + <div> {stageInfo.name} {details} </div> + case None => + Text("(Unknown)") + } } private def failureReasonCell(failureReason: String): Seq[Node] = { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 3b9d0d15ea..c7a877142b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -204,7 +204,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo // Verify streaming jobs have expected thread-local properties assert(jobGroupFound === null) - assert(jobDescFound === null) + assert(jobDescFound.contains("Streaming job from")) assert(jobInterruptFound === "false") // Verify current thread's thread-local properties have not changed |