aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-09-22 22:44:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-09-22 22:44:09 -0700
commit5548a254755bb84edae2768b94ab1816e1b49b91 (patch)
tree4f126c853f9b93a3b24fcc617a1c7c422cd7dc90 /streaming
parent558e9c7e60a7c0d85ba26634e97562ad2163e91d (diff)
downloadspark-5548a254755bb84edae2768b94ab1816e1b49b91.tar.gz
spark-5548a254755bb84edae2768b94ab1816e1b49b91.tar.bz2
spark-5548a254755bb84edae2768b94ab1816e1b49b91.zip
[SPARK-10652] [SPARK-10742] [STREAMING] Set meaningful job descriptions for all streaming jobs
Here is the screenshot after adding the job descriptions to threads that run receivers and the scheduler thread running the batch jobs. ## All jobs page * Added job descriptions with links to relevant batch details page ![image](https://cloud.githubusercontent.com/assets/663212/9924165/cda4a372-5cb1-11e5-91ca-d43a32c699e9.png) ## All stages page * Added stage descriptions with links to relevant batch details page ![image](https://cloud.githubusercontent.com/assets/663212/9923814/2cce266a-5cae-11e5-8a3f-dad84d06c50e.png) ## Streaming batch details page * Added the +details link ![image](https://cloud.githubusercontent.com/assets/663212/9921977/24014a32-5c98-11e5-958e-457b6c38065b.png) Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8791 from tdas/SPARK-10652.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala33
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala2
5 files changed, 42 insertions, 17 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6720ba4f72..94fea63f55 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -200,6 +200,8 @@ class StreamingContext private[streaming] (
private val startSite = new AtomicReference[CallSite](null)
+ private[streaming] def getStartSite(): CallSite = startSite.get()
+
private var shutdownHookRef: AnyRef = _
conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)
@@ -744,7 +746,7 @@ object StreamingContext extends Logging {
throw new IllegalStateException(
"Only one StreamingContext may be started in this JVM. " +
"Currently running StreamingContext was started at" +
- activeContext.get.startSite.get.longForm)
+ activeContext.get.getStartSite().longForm)
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 0cd39594ee..32d995dc42 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -25,6 +25,7 @@ import scala.util.{Failure, Success}
import org.apache.spark.Logging
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._
+import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{EventLoop, ThreadUtils}
@@ -190,10 +191,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
private class JobHandler(job: Job) extends Runnable with Logging {
+ import JobScheduler._
+
def run() {
- ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
- ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
try {
+ val formattedTime = UIUtils.formatBatchTime(
+ job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
+ val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
+ val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
+
+ ssc.sc.setJobDescription(
+ s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
+ ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
+ ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
+
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index f86fd44b48..204e6142fd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver._
-import org.apache.spark.util.{ThreadUtils, SerializableConfiguration}
+import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration}
/** Enumeration to identify current state of a Receiver */
@@ -554,6 +554,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
}
receiverRDD.setName(s"Receiver $receiverId")
+ ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
+ ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
+
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 90d1b0fade..9129c1f26a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -19,14 +19,14 @@ package org.apache.spark.streaming.ui
import javax.servlet.http.HttpServletRequest
-import scala.xml.{NodeSeq, Node, Text, Unparsed}
+import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.streaming.Time
-import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
-import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
+import org.apache.spark.streaming.ui.StreamingJobProgressListener.{OutputOpId, SparkJobId}
import org.apache.spark.ui.jobs.UIData.JobUIData
+import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])
@@ -207,16 +207,25 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
}
}
- val lastStageData = lastStageInfo.flatMap { s =>
- sparkListener.stageIdToData.get((s.stageId, s.attemptId))
- }
-
- val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
- val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+ lastStageInfo match {
+ case Some(stageInfo) =>
+ val details = if (stageInfo.details.nonEmpty) {
+ <span
+ onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
+ class="expand-details">
+ +details
+ </span> ++
+ <div class="stage-details collapsed">
+ <pre>{stageInfo.details}</pre>
+ </div>
+ } else {
+ NodeSeq.Empty
+ }
- <span class="description-input" title={lastStageDescription}>
- {lastStageDescription}
- </span> ++ Text(lastStageName)
+ <div> {stageInfo.name} {details} </div>
+ case None =>
+ Text("(Unknown)")
+ }
}
private def failureReasonCell(failureReason: String): Seq[Node] = {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 3b9d0d15ea..c7a877142b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -204,7 +204,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
// Verify streaming jobs have expected thread-local properties
assert(jobGroupFound === null)
- assert(jobDescFound === null)
+ assert(jobDescFound.contains("Streaming job from"))
assert(jobInterruptFound === "false")
// Verify current thread's thread-local properties have not changed