aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-10 10:18:50 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-08-10 10:18:50 -0700
commitd3277a0daf300ce2bb074e1a779300c1d811bfc5 (patch)
tree0e3219ee8aa26ed154f7dd105097637683663667 /core
parentd17eeb997d10307eb08706e7e4b3982aea578108 (diff)
parent14d14f451a35e647c4b9d13fd6a9f46d3091bae6 (diff)
downloadspark-d3277a0daf300ce2bb074e1a779300c1d811bfc5.tar.gz
spark-d3277a0daf300ce2bb074e1a779300c1d811bfc5.tar.bz2
spark-d3277a0daf300ce2bb074e1a779300c1d811bfc5.zip
Merge remote-tracking branch 'origin/pr/792'
Conflicts: core/src/main/scala/spark/ui/jobs/IndexPage.scala core/src/main/scala/spark/ui/jobs/StagePage.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala26
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala82
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListenerBus.scala74
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala112
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala37
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolPage.scala20
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolTable.scala15
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala158
-rw-r--r--core/src/main/scala/spark/ui/jobs/StageTable.scala12
10 files changed, 279 insertions, 259 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 40b30e4d23..1069e27513 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -547,7 +547,7 @@ class SparkContext(
}
def addSparkListener(listener: SparkListener) {
- dagScheduler.sparkListeners += listener
+ dagScheduler.addSparkListener(listener)
}
/**
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 89c51a44c9..fbf3f4c807 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -102,7 +102,7 @@ class DAGScheduler(
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
- private[spark] val sparkListeners = ArrayBuffer[SparkListener]()
+ private val listenerBus = new SparkListenerBus()
var cacheLocs = new HashMap[Int, Array[List[String]]]
@@ -137,6 +137,10 @@ class DAGScheduler(
}.start()
}
+ def addSparkListener(listener: SparkListener) {
+ listenerBus.addListener(listener)
+ }
+
private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
if (!cacheLocs.contains(rdd.id)) {
val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray
@@ -334,7 +338,7 @@ class DAGScheduler(
// Compute very short actions like first() or take() with no parent stages locally.
runLocally(job)
} else {
- sparkListeners.foreach(_.onJobStart(SparkListenerJobStart(job, properties)))
+ listenerBus.post(SparkListenerJobStart(job, properties))
idToActiveJob(runId) = job
activeJobs += job
resultStageToJob(finalStage) = job
@@ -348,11 +352,11 @@ class DAGScheduler(
handleExecutorLost(execId)
case begin: BeginEvent =>
- sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo)))
+ listenerBus.post(SparkListenerTaskStart(begin.task, begin.taskInfo))
case completion: CompletionEvent =>
- sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
- completion.reason, completion.taskInfo, completion.taskMetrics)))
+ listenerBus.post(SparkListenerTaskEnd(
+ completion.task, completion.reason, completion.taskInfo, completion.taskMetrics))
handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
@@ -363,7 +367,7 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, None))))
+ listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, None)))
}
return true
}
@@ -513,8 +517,7 @@ class DAGScheduler(
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
val properties = idToActiveJob(stage.priority).properties
- sparkListeners.foreach(_.onStageSubmitted(
- SparkListenerStageSubmitted(stage, tasks.size, properties)))
+ listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
@@ -560,8 +563,7 @@ class DAGScheduler(
}
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.completionTime = Some(System.currentTimeMillis)
- val stageComp = StageCompleted(stageToInfos(stage))
- sparkListeners.foreach{_.onStageCompleted(stageComp)}
+ listenerBus.post(StageCompleted(stageToInfos(stage)))
running -= stage
}
event.reason match {
@@ -585,7 +587,7 @@ class DAGScheduler(
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobSucceeded)))
+ listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
@@ -732,7 +734,7 @@ class DAGScheduler(
val job = resultStageToJob(resultStage)
val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))))
+ listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
idToActiveJob -= resultStage.priority
activeJobs -= job
resultStageToJob -= resultStage
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index ad2efcec63..7194fcaa49 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -23,10 +23,11 @@ import java.io.FileNotFoundException
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import java.util.concurrent.LinkedBlockingQueue
+
import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source
+
import spark._
-import spark.SparkContext
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
@@ -54,31 +55,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
def getJobIDToStages = jobIDToStages
def getEventQueue = eventQueue
- new Thread("JobLogger") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- logDebug("Got event of type " + event.getClass.getName)
- event match {
- case SparkListenerJobStart(job, properties) =>
- processJobStartEvent(job, properties)
- case SparkListenerStageSubmitted(stage, taskSize, properties) =>
- processStageSubmittedEvent(stage, taskSize)
- case StageCompleted(stageInfo) =>
- processStageCompletedEvent(stageInfo)
- case SparkListenerJobEnd(job, result) =>
- processJobEndEvent(job, result)
- case SparkListenerTaskStart(task, taskInfo) =>
- processTaskStartEvent(task, taskInfo)
- case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
- processTaskEndEvent(task, reason, taskInfo, taskMetrics)
- case _ =>
- }
- }
- }
- }.start()
-
// Create a folder for log files, the folder's name is the creation time of the jobLogger
protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/")
@@ -239,49 +215,32 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
- eventQueue.put(stageSubmitted)
- }
-
- protected def processStageSubmittedEvent(stage: Stage, taskSize: Int) {
- stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED" + " TASK_SIZE=" + taskSize)
+ stageLogInfo(
+ stageSubmitted.stage.id,
+ "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
+ stageSubmitted.stage.id, stageSubmitted.taskSize))
}
override def onStageCompleted(stageCompleted: StageCompleted) {
- eventQueue.put(stageCompleted)
- }
-
- protected def processStageCompletedEvent(stageInfo: StageInfo) {
- stageLogInfo(stageInfo.stage.id, "STAGE_ID=" +
- stageInfo.stage.id + " STATUS=COMPLETED")
+ stageLogInfo(
+ stageCompleted.stageInfo.stage.id,
+ "STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id))
}
- override def onTaskStart(taskStart: SparkListenerTaskStart) {
- eventQueue.put(taskStart)
- }
-
- protected def processTaskStartEvent(task: Task[_], taskInfo: TaskInfo) {
- var taskStatus = ""
- task match {
- case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
- case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
- }
- }
+ override def onTaskStart(taskStart: SparkListenerTaskStart) { }
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- eventQueue.put(taskEnd)
- }
-
- protected def processTaskEndEvent(task: Task[_], reason: TaskEndReason,
- taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
+ val task = taskEnd.task
+ val taskInfo = taskEnd.taskInfo
var taskStatus = ""
task match {
case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
}
- reason match {
+ taskEnd.reason match {
case Success => taskStatus += " STATUS=SUCCESS"
- recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskMetrics)
+ recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
case Resubmitted =>
taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
" STAGE_ID=" + task.stageId
@@ -300,12 +259,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
- eventQueue.put(jobEnd)
- }
-
- protected def processJobEndEvent(job: ActiveJob, reason: JobResult) {
+ val job = jobEnd.job
var info = "JOB_ID=" + job.runId
- reason match {
+ jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS"
case JobFailed(exception, _) =>
info += " STATUS=FAILED REASON="
@@ -324,10 +280,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
override def onJobStart(jobStart: SparkListenerJobStart) {
- eventQueue.put(jobStart)
- }
-
- protected def processJobStartEvent(job: ActiveJob, properties: Properties) {
+ val job = jobStart.job
+ val properties = jobStart.properties
createLogWriter(job.runId)
recordJobProperties(job.runId, properties)
buildJobDep(job.runId, job.finalStage)
diff --git a/core/src/main/scala/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
new file mode 100644
index 0000000000..f55ed455ed
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+
+import spark.Logging
+
+/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
+private[spark] class SparkListenerBus() extends Logging {
+ private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
+
+ /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+ * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+ private val EVENT_QUEUE_CAPACITY = 10000
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
+ private var queueFullErrorMessageLogged = false
+
+ new Thread("SparkListenerBus") {
+ setDaemon(true)
+ override def run() {
+ while (true) {
+ val event = eventQueue.take
+ event match {
+ case stageSubmitted: SparkListenerStageSubmitted =>
+ sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
+ case stageCompleted: StageCompleted =>
+ sparkListeners.foreach(_.onStageCompleted(stageCompleted))
+ case jobStart: SparkListenerJobStart =>
+ sparkListeners.foreach(_.onJobStart(jobStart))
+ case jobEnd: SparkListenerJobEnd =>
+ sparkListeners.foreach(_.onJobEnd(jobEnd))
+ case taskStart: SparkListenerTaskStart =>
+ sparkListeners.foreach(_.onTaskStart(taskStart))
+ case taskEnd: SparkListenerTaskEnd =>
+ sparkListeners.foreach(_.onTaskEnd(taskEnd))
+ case _ =>
+ }
+ }
+ }
+ }.start()
+
+ def addListener(listener: SparkListener) {
+ sparkListeners += listener
+ }
+
+ def post(event: SparkListenerEvents) {
+ val eventAdded = eventQueue.offer(event)
+ if (!eventAdded && !queueFullErrorMessageLogged) {
+ logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+ "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+ "rate at which tasks are being started by the scheduler.")
+ queueFullErrorMessageLogged = true
+ }
+ }
+}
+
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 117b84e615..9724671a03 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -31,73 +31,63 @@ private[spark] class IndexPage(parent: JobProgressUI) {
def listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
- val activeStages = listener.activeStages.toSeq
- val completedStages = listener.completedStages.reverse.toSeq
- val failedStages = listener.failedStages.reverse.toSeq
- val now = System.currentTimeMillis()
+ listener.synchronized {
+ val activeStages = listener.activeStages.toSeq
+ val completedStages = listener.completedStages.reverse.toSeq
+ val failedStages = listener.failedStages.reverse.toSeq
+ val now = System.currentTimeMillis()
- var activeTime = 0L
- for (tasks <- listener.stageToTasksActive.values; t <- tasks) {
- activeTime += t.timeRunning(now)
- }
+ var activeTime = 0L
+ for (tasks <- listener.stageToTasksActive.values; t <- tasks) {
+ activeTime += t.timeRunning(now)
+ }
- val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
- val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
- val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
+ val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
+ val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
+ val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
- val poolTable = new PoolTable(listener.sc.getAllPools, listener)
- val summary: NodeSeq =
- <div>
- <ul class="unstyled">
- <li>
- <strong>Duration: </strong>
- {parent.formatDuration(now - listener.sc.startTime)}
- </li>
- <li>
- <strong>CPU time: </strong>
- {parent.formatDuration(listener.totalTime + activeTime)}
- </li>
- {if (listener.totalShuffleRead > 0)
+ val poolTable = new PoolTable(listener.sc.getAllPools, listener)
+ val summary: NodeSeq =
+ <div>
+ <ul class="unstyled">
+ <li>
+ <strong>Duration: </strong>
+ {parent.formatDuration(now - listener.sc.startTime)}
+ </li>
+ <li>
+ <strong>CPU Time: </strong>
+ {parent.formatDuration(listener.totalTime + activeTime)}
+ </li>
+ <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
<li>
- <strong>Shuffle read: </strong>
- {Utils.memoryBytesToString(listener.totalShuffleRead)}
- </li>
- }
- {if (listener.totalShuffleWrite > 0)
+ <a href="#active"><strong>Active Stages:</strong></a>
+ {activeStages.size}
+ </li>
<li>
- <strong>Shuffle write: </strong>
- {Utils.memoryBytesToString(listener.totalShuffleWrite)}
- </li>
- }
- <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
- <li>
- <a href="#active"><strong>Active Stages:</strong></a>
- {activeStages.size}
- </li>
- <li>
- <a href="#completed"><strong>Completed Stages:</strong></a>
- {completedStages.size}
- </li>
- <li>
- <a href="#failed"><strong>Failed Stages:</strong></a>
- {failedStages.size}
- </li>
- </ul>
- </div>
+ <a href="#completed"><strong>Completed Stages:</strong></a>
+ {completedStages.size}
+ </li>
+ <li>
+ <a href="#failed"><strong>Failed Stages:</strong></a>
+ {failedStages.size}
+ </li>
+ </ul>
+ </div>
- val content = summary ++
- {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
- <h4>Pools</h4> ++ poolTable.toNodeSeq
- } else {
- Seq()
- }} ++
- <h4 id="active">Active Stages : {activeStages.size}</h4> ++
- activeStagesTable.toNodeSeq++
- <h4 id="completed">Completed Stages : {completedStages.size}</h4> ++
- completedStagesTable.toNodeSeq++
- <h4 id ="failed">Failed Stages : {failedStages.size}</h4> ++
- failedStagesTable.toNodeSeq
+ val content = summary ++
+ {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
+ <h4>Pools</h4> ++ poolTable.toNodeSeq
+ } else {
+ Seq()
+ }} ++
+ <h4 id="active">Active Stages: {activeStages.size}</h4> ++
+ activeStagesTable.toNodeSeq++
+ <h4 id="completed">Completed Stages: {completedStages.size}</h4> ++
+ completedStagesTable.toNodeSeq++
+ <h4 id ="failed">Failed Stages: {failedStages.size}</h4> ++
+ failedStagesTable.toNodeSeq
- headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
+ headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
+ }
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
index c6103edcb0..1d9767a83c 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -9,6 +9,13 @@ import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import collection.mutable
+/**
+ * Tracks task-level information to be displayed in the UI.
+ *
+ * All access to the data structures in this class must be synchronized on the
+ * class, since the UI thread and the DAGScheduler event loop may otherwise
+ * be reading/updating the internal data structures concurrently.
+ */
private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
// How many stages to remember
val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
@@ -38,7 +45,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onJobStart(jobStart: SparkListenerJobStart) {}
- override def onStageCompleted(stageCompleted: StageCompleted) = {
+ override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
val stage = stageCompleted.stageInfo.stage
poolToActiveStages(stageToPool(stage)) -= stage
activeStages -= stage
@@ -47,7 +54,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
}
/** If stages is too large, remove and garbage collect old stages */
- def trimIfNecessary(stages: ListBuffer[Stage]) {
+ def trimIfNecessary(stages: ListBuffer[Stage]) = synchronized {
if (stages.size > RETAINED_STAGES) {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
@@ -66,7 +73,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
}
/** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
val stage = stageSubmitted.stage
activeStages += stage
@@ -84,7 +91,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stages += stage
}
- override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val sid = taskStart.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive += taskStart.taskInfo
@@ -94,7 +101,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageToTaskInfos(sid) = taskList
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val sid = taskEnd.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
@@ -132,7 +139,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageToTaskInfos(sid) = taskList
}
- override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
jobEnd match {
case end: SparkListenerJobEnd =>
end.jobResult match {
@@ -146,22 +153,4 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
case _ =>
}
}
-
- /** Is this stage's input from a shuffle read. */
- def hasShuffleRead(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
-
- /** Is this stage's output to a shuffle write. */
- def hasShuffleWrite(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
index 647c6d2ae3..04ef35c800 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
@@ -14,17 +14,19 @@ private[spark] class PoolPage(parent: JobProgressUI) {
def listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
- val poolName = request.getParameter("poolname")
- val poolToActiveStages = listener.poolToActiveStages
- val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
- val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
+ listener.synchronized {
+ val poolName = request.getParameter("poolname")
+ val poolToActiveStages = listener.poolToActiveStages
+ val activeStages = poolToActiveStages.get(poolName).toSeq.flatten
+ val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
- val pool = listener.sc.getPoolForName(poolName).get
- val poolTable = new PoolTable(Seq(pool), listener)
+ val pool = listener.sc.getPoolForName(poolName).get
+ val poolTable = new PoolTable(Seq(pool), listener)
- val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
- <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
+ val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
+ <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
- headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
+ headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
+ }
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
index 9cfe0d68f0..21ebcef63a 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -13,11 +13,12 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
def toNodeSeq(): Seq[Node] = {
- poolTable(poolRow, pools)
+ listener.synchronized {
+ poolTable(poolRow, pools)
+ }
}
- // pool tables
- def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
+ private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
rows: Seq[Schedulable]
): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
@@ -35,12 +36,16 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
</table>
}
- def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = {
+ private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = {
+ val activeStages = poolToActiveStages.get(p.name) match {
+ case Some(stages) => stages.size
+ case None => 0
+ }
<tr>
<td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
- <td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td>
+ <td>{activeStages}</td>
<td>{p.runningTasks}</td>
<td>{p.schedulingMode}</td>
</tr>
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 02f9adf8a8..1b071a91e5 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -36,100 +36,102 @@ private[spark] class StagePage(parent: JobProgressUI) {
val dateFmt = parent.dateFmt
def render(request: HttpServletRequest): Seq[Node] = {
- val stageId = request.getParameter("id").toInt
- val now = System.currentTimeMillis()
+ listener.synchronized {
+ val stageId = request.getParameter("id").toInt
+ val now = System.currentTimeMillis()
+
+ if (!listener.stageToTaskInfos.contains(stageId)) {
+ val content =
+ <div>
+ <h4>Summary Metrics</h4> No tasks have started yet
+ <h4>Tasks</h4> No tasks have started yet
+ </div>
+ return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ }
- if (!listener.stageToTaskInfos.contains(stageId)) {
- val content =
- <div>
- <h4>Summary Metrics</h4> No tasks have started yet
- <h4>Tasks</h4> No tasks have started yet
- </div>
- return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
- }
+ val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime)
- val tasks = listener.stageToTaskInfos(stageId).toSeq
+ val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L)
+ val hasShuffleRead = shuffleReadBytes > 0
+ val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L)
+ val hasShuffleWrite = shuffleWriteBytes > 0
- val shuffleRead = listener.stageToShuffleRead(stageId) > 0
- val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0
+ var activeTime = 0L
+ listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
- var activeTime = 0L
- listener.stageToTasksActive(stageId).foreach { t =>
- activeTime += t.timeRunning(now)
- }
-
- val summary =
- <div>
- <ul class="unstyled">
- <li>
- <strong>CPU time: </strong>
- {parent.formatDuration(listener.stageToTime(stageId) + activeTime)}
- </li>
- {if (shuffleRead)
- <li>
- <strong>Shuffle read: </strong>
- {Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))}
- </li>
- }
- {if (shuffleWrite)
+ val summary =
+ <div>
+ <ul class="unstyled">
<li>
- <strong>Shuffle write: </strong>
- {Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))}
+ <strong>CPU time: </strong>
+ {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
- }
- </ul>
- </div>
+ {if (hasShuffleRead)
+ <li>
+ <strong>Shuffle read: </strong>
+ {Utils.memoryBytesToString(shuffleReadBytes)}
+ </li>
+ }
+ {if (hasShuffleWrite)
+ <li>
+ <strong>Shuffle write: </strong>
+ {Utils.memoryBytesToString(shuffleWriteBytes)}
+ </li>
+ }
+ </ul>
+ </div>
- val taskHeaders: Seq[String] =
- Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
- {if (shuffleRead) Seq("Shuffle Read") else Nil} ++
- {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
- Seq("Details")
+ val taskHeaders: Seq[String] =
+ Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
+ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
+ {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
+ Seq("Details")
- val taskTable = listingTable(taskHeaders, taskRow, tasks)
+ val taskTable = listingTable(taskHeaders, taskRow, tasks)
- // Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (Option(t._2).isDefined))
+ // Excludes tasks which failed and have incomplete metrics
+ val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
- val summaryTable: Option[Seq[Node]] =
- if (validTasks.size == 0) {
- None
- }
- else {
- val serviceTimes = validTasks.map{case (info, metrics, exception) =>
- metrics.get.executorRunTime.toDouble}
- val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
- ms => parent.formatDuration(ms.toLong))
-
- def getQuantileCols(data: Seq[Double]) =
- Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
-
- val shuffleReadSizes = validTasks.map {
- case(info, metrics, exception) =>
- metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ val summaryTable: Option[Seq[Node]] =
+ if (validTasks.size == 0) {
+ None
}
- val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
+ else {
+ val serviceTimes = validTasks.map{case (info, metrics, exception) =>
+ metrics.get.executorRunTime.toDouble}
+ val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
+ ms => parent.formatDuration(ms.toLong))
+
+ def getQuantileCols(data: Seq[Double]) =
+ Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
+
+ val shuffleReadSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ }
+ val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
- val shuffleWriteSizes = validTasks.map {
- case(info, metrics, exception) =>
- metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
- }
- val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
+ val shuffleWriteSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
+ }
+ val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
- val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
- if (shuffleRead) shuffleReadQuantiles else Nil,
- if (shuffleWrite) shuffleWriteQuantiles else Nil)
+ val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
+ if (hasShuffleRead) shuffleReadQuantiles else Nil,
+ if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
- val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
- def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
- Some(listingTable(quantileHeaders, quantileRow, listings))
- }
+ val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
+ def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
+ Some(listingTable(quantileHeaders, quantileRow, listings))
+ }
- val content =
- summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
- <h2>Tasks</h2> ++ taskTable;
+ val content =
+ summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
+ <h2>Tasks</h2> ++ taskTable;
- headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ }
}
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index 1df0e0913c..5068a025fa 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -25,11 +25,13 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
def toNodeSeq(): Seq[Node] = {
- stageTable(stageRow, stages)
+ listener.synchronized {
+ stageTable(stageRow, stages)
+ }
}
/** Special table which merges two header cells. */
- def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+ private def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Stage Id</th>
@@ -47,14 +49,14 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
</table>
}
- def getElapsedTime(submitted: Option[Long], completed: Long): String = {
+ private def getElapsedTime(submitted: Option[Long], completed: Long): String = {
submitted match {
case Some(t) => parent.formatDuration(completed - t)
case _ => "Unknown"
}
}
- def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
+ private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
@@ -68,7 +70,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
}
- def stageRow(s: Stage): Seq[Node] = {
+ private def stageRow(s: Stage): Seq[Node] = {
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"