aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-02 15:46:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-08-02 15:47:41 -0700
commit5b3784a79c4e6069ace17c5dddc0ad0046909c8b (patch)
tree9cd0c9330971a86745bf6d1dd6161b946c828f31 /core/src
parent9d7dfd2d5a28c3e329fe14cf10f5184af1263dd6 (diff)
downloadspark-5b3784a79c4e6069ace17c5dddc0ad0046909c8b.tar.gz
spark-5b3784a79c4e6069ace17c5dddc0ad0046909c8b.tar.bz2
spark-5b3784a79c4e6069ace17c5dddc0ad0046909c8b.zip
Show user-defined job name in UI
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/resources/spark/ui/static/webui.css4
-rw-r--r--core/src/main/scala/spark/SparkContext.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala5
-rw-r--r--core/src/main/scala/spark/ui/UIWorkloadGenerator.scala6
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala18
-rw-r--r--core/src/main/scala/spark/ui/jobs/StageTable.scala10
-rw-r--r--core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala2
7 files changed, 39 insertions, 17 deletions
diff --git a/core/src/main/resources/spark/ui/static/webui.css b/core/src/main/resources/spark/ui/static/webui.css
index f7537bb766..8b9f4ee938 100644
--- a/core/src/main/resources/spark/ui/static/webui.css
+++ b/core/src/main/resources/spark/ui/static/webui.css
@@ -47,3 +47,7 @@
padding-top: 7px;
padding-left: 4px;
}
+
+.table td {
+ vertical-align: middle !important;
+}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 97e1aaf49e..039f5522b7 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -265,12 +265,18 @@ class SparkContext(
localProperties.value = new Properties()
}
- def addLocalProperties(key: String, value: String) {
+ def addLocalProperty(key: String, value: String) {
if(localProperties.value == null) {
localProperties.value = new Properties()
}
localProperties.value.setProperty(key,value)
}
+
+ /** Set a human readable description of the current job. */
+ def setDescription(value: String) {
+ addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
+ }
+
// Post init
taskScheduler.postStartHook()
@@ -841,6 +847,7 @@ class SparkContext(
* various Spark features.
*/
object SparkContext {
+ val SPARK_JOB_DESCRIPTION = "spark.job.description"
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
@@ -958,7 +965,6 @@ object SparkContext {
}
}
-
/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
@@ -970,3 +976,4 @@ private[spark] class WritableConverter[T](
val writableClass: ClassManifest[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable
+
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index c7e8f8a9a1..ad2efcec63 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source
import spark._
+import spark.SparkContext
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
@@ -317,8 +318,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
- val annotation = properties.getProperty("spark.job.annotation", "")
- jobLogInfo(jobID, annotation, false)
+ val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
+ jobLogInfo(jobID, description, false)
}
}
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index 3ac35085eb..97ea644021 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -46,13 +46,11 @@ private[spark] object UIWorkloadGenerator {
}
val sc = new SparkContext(master, appName)
- // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
- // but we pass it here anyways since it will be useful once we do.
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
- sc.addLocalProperties("spark.scheduler.cluster.fair.pool", s)
+ sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
}
- sc.addLocalProperties("spark.job.annotation", s)
+ sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
index 200e13cf99..f22c4e39e3 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -15,6 +15,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val DEFAULT_POOL_NAME = "default"
val stageToPool = new HashMap[Stage, String]()
+ val stageToDescription = new HashMap[Stage, String]()
val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
val activeStages = HashSet[Stage]()
@@ -57,6 +58,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageToTasksActive.remove(s.id)
stageToTasksComplete.remove(s.id)
stageToTasksFailed.remove(s.id)
+ stageToPool.remove(s)
+ if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
})
stages.trimEnd(toRemove)
}
@@ -66,12 +69,17 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
val stage = stageSubmitted.stage
activeStages += stage
- var poolName = DEFAULT_POOL_NAME
- if (stageSubmitted.properties != null) {
- poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool",
- DEFAULT_POOL_NAME)
- }
+
+ val poolName = Option(stageSubmitted.properties).map {
+ p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+ }.getOrElse(DEFAULT_POOL_NAME)
stageToPool(stage) = poolName
+
+ val description = Option(stageSubmitted.properties).flatMap {
+ p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
+ }
+ description.map(d => stageToDescription(stage) = d)
+
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
stages += stage
}
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index 3257f4e360..38fa3bcbcd 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -34,7 +34,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
<thead>
<th>Stage Id</th>
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
- <th>Origin</th>
+ <th>Description</th>
<th>Submitted</th>
<td>Duration</td>
<td colspan="2">Tasks: Complete/Total</td>
@@ -87,13 +87,17 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val poolName = listener.stageToPool.get(s)
+ val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
+ val description = listener.stageToDescription.get(s)
+ .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
+
<tr>
<td>{s.id}</td>
{if (isFairScheduler) {
<td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
}
- <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
- <td>{submissionTime}</td>
+ <td>{description}</td>
+ <td valign="middle">{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
<td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td>
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
index 14bb58731b..66fd59e8bb 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
@@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
- sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName)
+ sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
}
override def run() {
val ans = nums.map(number => {