diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-08-02 15:46:09 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-08-02 15:47:41 -0700 |
commit | 5b3784a79c4e6069ace17c5dddc0ad0046909c8b (patch) | |
tree | 9cd0c9330971a86745bf6d1dd6161b946c828f31 /core/src | |
parent | 9d7dfd2d5a28c3e329fe14cf10f5184af1263dd6 (diff) | |
download | spark-5b3784a79c4e6069ace17c5dddc0ad0046909c8b.tar.gz spark-5b3784a79c4e6069ace17c5dddc0ad0046909c8b.tar.bz2 spark-5b3784a79c4e6069ace17c5dddc0ad0046909c8b.zip |
Show user-defined job name in UI
Diffstat (limited to 'core/src')
7 files changed, 39 insertions, 17 deletions
diff --git a/core/src/main/resources/spark/ui/static/webui.css b/core/src/main/resources/spark/ui/static/webui.css index f7537bb766..8b9f4ee938 100644 --- a/core/src/main/resources/spark/ui/static/webui.css +++ b/core/src/main/resources/spark/ui/static/webui.css @@ -47,3 +47,7 @@ padding-top: 7px; padding-left: 4px; } + +.table td { + vertical-align: middle !important; +} diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 97e1aaf49e..039f5522b7 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -265,12 +265,18 @@ class SparkContext( localProperties.value = new Properties() } - def addLocalProperties(key: String, value: String) { + def addLocalProperty(key: String, value: String) { if(localProperties.value == null) { localProperties.value = new Properties() } localProperties.value.setProperty(key,value) } + + /** Set a human readable description of the current job. */ + def setDescription(value: String) { + addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value) + } + // Post init taskScheduler.postStartHook() @@ -841,6 +847,7 @@ class SparkContext( * various Spark features. */ object SparkContext { + val SPARK_JOB_DESCRIPTION = "spark.job.description" implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 @@ -958,7 +965,6 @@ object SparkContext { } } - /** * A class encapsulating how to convert some type T to Writable. It stores both the Writable class * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion. @@ -970,3 +976,4 @@ private[spark] class WritableConverter[T]( val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable + diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index c7e8f8a9a1..ad2efcec63 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source
import spark._
+import spark.SparkContext
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
@@ -317,8 +318,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
- val annotation = properties.getProperty("spark.job.annotation", "")
- jobLogInfo(jobID, annotation, false)
+ val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
+ jobLogInfo(jobID, description, false)
}
}
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala index 3ac35085eb..97ea644021 100644 --- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala @@ -46,13 +46,11 @@ private[spark] object UIWorkloadGenerator { } val sc = new SparkContext(master, appName) - // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase, - // but we pass it here anyways since it will be useful once we do. def setProperties(s: String) = { if(schedulingMode == SchedulingMode.FAIR) { - sc.addLocalProperties("spark.scheduler.cluster.fair.pool", s) + sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s) } - sc.addLocalProperties("spark.job.annotation", s) + sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s) } val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS) diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala index 200e13cf99..f22c4e39e3 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -15,6 +15,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val DEFAULT_POOL_NAME = "default" val stageToPool = new HashMap[Stage, String]() + val stageToDescription = new HashMap[Stage, String]() val poolToActiveStages = new HashMap[String, HashSet[Stage]]() val activeStages = HashSet[Stage]() @@ -57,6 +58,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageToTasksActive.remove(s.id) stageToTasksComplete.remove(s.id) stageToTasksFailed.remove(s.id) + stageToPool.remove(s) + if (stageToDescription.contains(s)) {stageToDescription.remove(s)} }) stages.trimEnd(toRemove) } @@ -66,12 +69,17 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { val stage = stageSubmitted.stage activeStages += stage - var poolName = DEFAULT_POOL_NAME - if (stageSubmitted.properties != null) { - poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool", - DEFAULT_POOL_NAME) - } + + val poolName = Option(stageSubmitted.properties).map { + p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) + }.getOrElse(DEFAULT_POOL_NAME) stageToPool(stage) = poolName + + val description = Option(stageSubmitted.properties).flatMap { + p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) + } + description.map(d => stageToDescription(stage) = d) + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]()) stages += stage } diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala index 3257f4e360..38fa3bcbcd 100644 --- a/core/src/main/scala/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala @@ -34,7 +34,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU <thead> <th>Stage Id</th> {if (isFairScheduler) {<th>Pool Name</th>} else {}} - <th>Origin</th> + <th>Description</th> <th>Submitted</th> <td>Duration</td> <td colspan="2">Tasks: Complete/Total</td> @@ -87,13 +87,17 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU val poolName = listener.stageToPool.get(s) + val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a> + val description = listener.stageToDescription.get(s) + .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink) + <tr> <td>{s.id}</td> {if (isFairScheduler) { <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>} } - <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td> - <td>{submissionTime}</td> + <td>{description}</td> + <td valign="middle">{submissionTime}</td> <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td> <td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td> diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala index 14bb58731b..66fd59e8bb 100644 --- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala @@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1) new Thread { if (poolName != null) { - sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName) + sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName) } override def run() { val ans = nums.map(number => { |