diff options
author | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-09 13:08:46 -0700 |
---|---|---|
committer | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-09 13:27:41 -0700 |
commit | 81e1d4a7d19cefe4bbf0fe6ec54b4f894d7b21d0 (patch) | |
tree | e147854e0d30e5a84f2e5d8a164fc7e8075e0d40 /core | |
parent | 88049a214df8ee0b155e5c4b894cf32bab7bafc5 (diff) | |
download | spark-81e1d4a7d19cefe4bbf0fe6ec54b4f894d7b21d0.tar.gz spark-81e1d4a7d19cefe4bbf0fe6ec54b4f894d7b21d0.tar.bz2 spark-81e1d4a7d19cefe4bbf0fe6ec54b4f894d7b21d0.zip |
Refactored SparkListener to process all events asynchronously.
This commit fixes issues where SparkListeners that take a while to
process events slow the DAGScheduler.
This commit also fixes a bug in the UI where if a user goes to a
web page of a stage that does not exist, they can create a memory
leak (granted, this is not an issue at small scale -- probably only
an issue if someone actively tried to DOS the UI).
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 27 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/JobLogger.scala | 79 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala | 62 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/IndexPage.scala | 122 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/PoolPage.scala | 20 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/PoolTable.scala | 15 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StagePage.scala | 22 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StageTable.scala | 12 |
9 files changed, 196 insertions, 165 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 40b30e4d23..1069e27513 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -547,7 +547,7 @@ class SparkContext( } def addSparkListener(listener: SparkListener) { - dagScheduler.sparkListeners += listener + dagScheduler.addSparkListener(listener) } /** diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 89c51a44c9..9e2f3cc81f 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -102,7 +102,7 @@ class DAGScheduler( private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private[spark] val sparkListeners = ArrayBuffer[SparkListener]() + private val listenerEventProcessor = new SparkListenerEventProcessor() var cacheLocs = new HashMap[Int, Array[List[String]]] @@ -137,6 +137,10 @@ class DAGScheduler( }.start() } + def addSparkListener(listener: SparkListener) { + listenerEventProcessor.addListener(listener) + } + private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray @@ -334,7 +338,7 @@ class DAGScheduler( // Compute very short actions like first() or take() with no parent stages locally. runLocally(job) } else { - sparkListeners.foreach(_.onJobStart(SparkListenerJobStart(job, properties))) + listenerEventProcessor.addEvent(SparkListenerJobStart(job, properties)) idToActiveJob(runId) = job activeJobs += job resultStageToJob(finalStage) = job @@ -348,11 +352,11 @@ class DAGScheduler( handleExecutorLost(execId) case begin: BeginEvent => - sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo))) + listenerEventProcessor.addEvent(SparkListenerTaskStart(begin.task, begin.taskInfo)) case completion: CompletionEvent => - sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task, - completion.reason, completion.taskInfo, completion.taskMetrics))) + listenerEventProcessor.addEvent(SparkListenerTaskEnd( + completion.task, completion.reason, completion.taskInfo, completion.taskMetrics)) handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => @@ -363,7 +367,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, None)))) + listenerEventProcessor.addEvent(SparkListenerJobEnd(job, JobFailed(error, None))) } return true } @@ -513,8 +517,8 @@ class DAGScheduler( // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" val properties = idToActiveJob(stage.priority).properties - sparkListeners.foreach(_.onStageSubmitted( - SparkListenerStageSubmitted(stage, tasks.size, properties))) + listenerEventProcessor.addEvent( + SparkListenerStageSubmitted(stage, tasks.size, properties)) if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this @@ -560,8 +564,7 @@ class DAGScheduler( } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stage.completionTime = Some(System.currentTimeMillis) - val stageComp = StageCompleted(stageToInfos(stage)) - sparkListeners.foreach{_.onStageCompleted(stageComp)} + listenerEventProcessor.addEvent(StageCompleted(stageToInfos(stage))) running -= stage } event.reason match { @@ -585,7 +588,7 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobSucceeded))) + listenerEventProcessor.addEvent(SparkListenerJobEnd(job, JobSucceeded)) } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -732,7 +735,7 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))) + listenerEventProcessor.addEvent(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) idToActiveJob -= resultStage.priority activeJobs -= job resultStageToJob -= resultStage diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index ad2efcec63..7ceb2f22f6 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -54,31 +54,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { def getJobIDToStages = jobIDToStages
def getEventQueue = eventQueue
- new Thread("JobLogger") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- logDebug("Got event of type " + event.getClass.getName)
- event match {
- case SparkListenerJobStart(job, properties) =>
- processJobStartEvent(job, properties)
- case SparkListenerStageSubmitted(stage, taskSize, properties) =>
- processStageSubmittedEvent(stage, taskSize)
- case StageCompleted(stageInfo) =>
- processStageCompletedEvent(stageInfo)
- case SparkListenerJobEnd(job, result) =>
- processJobEndEvent(job, result)
- case SparkListenerTaskStart(task, taskInfo) =>
- processTaskStartEvent(task, taskInfo)
- case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
- processTaskEndEvent(task, reason, taskInfo, taskMetrics)
- case _ =>
- }
- }
- }
- }.start()
-
// Create a folder for log files, the folder's name is the creation time of the jobLogger
protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/")
@@ -239,49 +214,32 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { }
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
- eventQueue.put(stageSubmitted)
- }
-
- protected def processStageSubmittedEvent(stage: Stage, taskSize: Int) {
- stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED" + " TASK_SIZE=" + taskSize)
+ stageLogInfo(
+ stageSubmitted.stage.id,
+ "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
+ stageSubmitted.stage.id, stageSubmitted.taskSize))
}
override def onStageCompleted(stageCompleted: StageCompleted) {
- eventQueue.put(stageCompleted)
- }
-
- protected def processStageCompletedEvent(stageInfo: StageInfo) {
- stageLogInfo(stageInfo.stage.id, "STAGE_ID=" +
- stageInfo.stage.id + " STATUS=COMPLETED")
+ stageLogInfo(
+ stageCompleted.stageInfo.stage.id,
+ "STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id))
}
- override def onTaskStart(taskStart: SparkListenerTaskStart) {
- eventQueue.put(taskStart)
- }
-
- protected def processTaskStartEvent(task: Task[_], taskInfo: TaskInfo) {
- var taskStatus = ""
- task match {
- case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
- case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
- }
- }
+ override def onTaskStart(taskStart: SparkListenerTaskStart) { }
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- eventQueue.put(taskEnd)
- }
-
- protected def processTaskEndEvent(task: Task[_], reason: TaskEndReason,
- taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
+ val task = taskEnd.task
+ val taskInfo = taskEnd.taskInfo
var taskStatus = ""
task match {
case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
}
- reason match {
+ taskEnd.reason match {
case Success => taskStatus += " STATUS=SUCCESS"
- recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskMetrics)
+ recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
case Resubmitted =>
taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
" STAGE_ID=" + task.stageId
@@ -300,12 +258,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { }
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
- eventQueue.put(jobEnd)
- }
-
- protected def processJobEndEvent(job: ActiveJob, reason: JobResult) {
+ val job = jobEnd.job
var info = "JOB_ID=" + job.runId
- reason match {
+ jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS"
case JobFailed(exception, _) =>
info += " STATUS=FAILED REASON="
@@ -324,10 +279,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { }
override def onJobStart(jobStart: SparkListenerJobStart) {
- eventQueue.put(jobStart)
- }
-
- protected def processJobStartEvent(job: ActiveJob, properties: Properties) {
+ val job = jobStart.job
+ val properties = jobStart.properties
createLogWriter(job.runId)
recordJobProperties(job.runId, properties)
buildJobDep(job.runId, job.finalStage)
diff --git a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala new file mode 100644 index 0000000000..6d21c9d836 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala @@ -0,0 +1,62 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+
+import java.util.concurrent.LinkedBlockingQueue
+
+/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
+class SparkListenerEventProcessor() {
+ /* sparkListeners is not thread safe, so this assumes that listeners are all added before any
+ * SparkListenerEvents occur. */
+ private val sparkListeners = ArrayBuffer[SparkListener]()
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
+
+ new Thread("SparkListenerEventProcessor") {
+ setDaemon(true)
+ override def run() {
+ while (true) {
+ val event = eventQueue.take
+ event match {
+ case stageSubmitted: SparkListenerStageSubmitted =>
+ sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
+ case stageCompleted: StageCompleted =>
+ sparkListeners.foreach(_.onStageCompleted(stageCompleted))
+ case jobStart: SparkListenerJobStart =>
+ sparkListeners.foreach(_.onJobStart(jobStart))
+ case jobEnd: SparkListenerJobEnd =>
+ sparkListeners.foreach(_.onJobEnd(jobEnd))
+ case taskStart: SparkListenerTaskStart =>
+ sparkListeners.foreach(_.onTaskStart(taskStart))
+ case taskEnd: SparkListenerTaskEnd =>
+ sparkListeners.foreach(_.onTaskEnd(taskEnd))
+ case _ =>
+ }
+ }
+ }
+ }.start()
+
+ def addListener(listener: SparkListener) {
+ sparkListeners += listener
+ }
+
+ def addEvent(event: SparkListenerEvents) {
+ eventQueue.put(event)
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index b611b0fe85..2084f4a6e6 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -31,73 +31,75 @@ private[spark] class IndexPage(parent: JobProgressUI) { def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - val activeStages = listener.activeStages.toSeq - val completedStages = listener.completedStages.reverse.toSeq - val failedStages = listener.failedStages.reverse.toSeq - val now = System.currentTimeMillis() + listener.synchronized { + val activeStages = listener.activeStages.toSeq + val completedStages = listener.completedStages.reverse.toSeq + val failedStages = listener.failedStages.reverse.toSeq + val now = System.currentTimeMillis() - var activeTime = 0L - for (tasks <- listener.stageToTasksActive.values; t <- tasks) { - activeTime += t.timeRunning(now) - } + var activeTime = 0L + for (tasks <- listener.stageToTasksActive.values; t <- tasks) { + activeTime += t.timeRunning(now) + } - val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) - val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) - val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) + val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) + val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) + val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) - val poolTable = new PoolTable(listener.sc.getAllPools, listener) - val summary: NodeSeq = - <div> - <ul class="unstyled"> - <li> - <strong>Duration: </strong> - {parent.formatDuration(now - listener.sc.startTime)} - </li> - <li> - <strong>CPU time: </strong> - {parent.formatDuration(listener.totalTime + activeTime)} - </li> - {if (listener.totalShuffleRead > 0) + val poolTable = new PoolTable(listener.sc.getAllPools, listener) + val summary: NodeSeq = + <div> + <ul class="unstyled"> <li> - <strong>Shuffle read: </strong> - {Utils.memoryBytesToString(listener.totalShuffleRead)} - </li> - } - {if (listener.totalShuffleWrite > 0) + <strong>Duration: </strong> + {parent.formatDuration(now - listener.sc.startTime)} + </li> <li> - <strong>Shuffle write: </strong> - {Utils.memoryBytesToString(listener.totalShuffleWrite)} + <strong>CPU time: </strong> + {parent.formatDuration(listener.totalTime + activeTime)} </li> - } - <li> - <a href="#active"><strong>Active Stages:</strong></a> - {activeStages.size} - </li> - <li> - <a href="#completed"><strong>Completed Stages:</strong></a> - {completedStages.size} - </li> - <li> - <a href="#failed"><strong>Failed Stages:</strong></a> - {failedStages.size} - </li> - <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li> - </ul> - </div> + {if (listener.totalShuffleRead > 0) + <li> + <strong>Shuffle read: </strong> + {Utils.memoryBytesToString(listener.totalShuffleRead)} + </li> + } + {if (listener.totalShuffleWrite > 0) + <li> + <strong>Shuffle write: </strong> + {Utils.memoryBytesToString(listener.totalShuffleWrite)} + </li> + } + <li> + <a href="#active"><strong>Active Stages:</strong></a> + {activeStages.size} + </li> + <li> + <a href="#completed"><strong>Completed Stages:</strong></a> + {completedStages.size} + </li> + <li> + <a href="#failed"><strong>Failed Stages:</strong></a> + {failedStages.size} + </li> + <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li> + </ul> + </div> - val content = summary ++ - {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) { - <h3>Pools</h3> ++ poolTable.toNodeSeq - } else { - Seq() - }} ++ - <h3 id="active">Active Stages : {activeStages.size}</h3> ++ - activeStagesTable.toNodeSeq++ - <h3 id="completed">Completed Stages : {completedStages.size}</h3> ++ - completedStagesTable.toNodeSeq++ - <h3 id ="failed">Failed Stages : {failedStages.size}</h3> ++ - failedStagesTable.toNodeSeq + val content = summary ++ + {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) { + <h3>Pools</h3> ++ poolTable.toNodeSeq + } else { + Seq() + }} ++ + <h3 id="active">Active Stages : {activeStages.size}</h3> ++ + activeStagesTable.toNodeSeq++ + <h3 id="completed">Completed Stages : {completedStages.size}</h3> ++ + completedStagesTable.toNodeSeq++ + <h3 id ="failed">Failed Stages : {failedStages.size}</h3> ++ + failedStagesTable.toNodeSeq - headerSparkPage(content, parent.sc, "Spark Stages", Jobs) + headerSparkPage(content, parent.sc, "Spark Stages", Jobs) + } } } diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala index 647c6d2ae3..04ef35c800 100644 --- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala @@ -14,17 +14,19 @@ private[spark] class PoolPage(parent: JobProgressUI) { def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - val poolName = request.getParameter("poolname") - val poolToActiveStages = listener.poolToActiveStages - val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq - val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) + listener.synchronized { + val poolName = request.getParameter("poolname") + val poolToActiveStages = listener.poolToActiveStages + val activeStages = poolToActiveStages.get(poolName).toSeq.flatten + val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) - val pool = listener.sc.getPoolForName(poolName).get - val poolTable = new PoolTable(Seq(pool), listener) + val pool = listener.sc.getPoolForName(poolName).get + val poolTable = new PoolTable(Seq(pool), listener) - val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++ - <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq() + val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++ + <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq() - headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs) + headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs) + } } } diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala index 9cfe0d68f0..21ebcef63a 100644 --- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala @@ -13,11 +13,12 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages def toNodeSeq(): Seq[Node] = { - poolTable(poolRow, pools) + listener.synchronized { + poolTable(poolRow, pools) + } } - // pool tables - def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], + private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], rows: Seq[Schedulable] ): Seq[Node] = { <table class="table table-bordered table-striped table-condensed sortable"> @@ -35,12 +36,16 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis </table> } - def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = { + private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = { + val activeStages = poolToActiveStages.get(p.name) match { + case Some(stages) => stages.size + case None => 0 + } <tr> <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td> <td>{p.minShare}</td> <td>{p.weight}</td> - <td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td> + <td>{activeStages}</td> <td>{p.runningTasks}</td> <td>{p.schedulingMode}</td> </tr> diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index f6cef6694d..873b52aa1b 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -51,8 +51,10 @@ private[spark] class StagePage(parent: JobProgressUI) { val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime) - val shuffleRead = listener.stageToShuffleRead.getOrElse(stageId, 0L) > 0 - val shuffleWrite = listener.stageToShuffleWrite.getOrElse(stageId, 0L) > 0 + val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L) + val hasShuffleRead = shuffleReadBytes > 0 + val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L) + val hasShuffleWrite = shuffleWriteBytes > 0 var activeTime = 0L listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) @@ -64,16 +66,16 @@ private[spark] class StagePage(parent: JobProgressUI) { <strong>CPU time: </strong> {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)} </li> - {if (shuffleRead) + {if (hasShuffleRead) <li> <strong>Shuffle read: </strong> - {Utils.memoryBytesToString(listener.stageToShuffleRead.getOrElse(stageId, 0L))} + {Utils.memoryBytesToString(shuffleReadBytes)} </li> } - {if (shuffleWrite) + {if (hasShuffleWrite) <li> <strong>Shuffle write: </strong> - {Utils.memoryBytesToString(listener.stageToShuffleWrite.getOrElse(stageId, 0L))} + {Utils.memoryBytesToString(shuffleWriteBytes)} </li> } </ul> @@ -81,8 +83,8 @@ private[spark] class StagePage(parent: JobProgressUI) { val taskHeaders: Seq[String] = Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++ - {if (shuffleRead) Seq("Shuffle Read") else Nil} ++ - {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++ + {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ + {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++ Seq("Details") val taskTable = listingTable(taskHeaders, taskRow, tasks) @@ -116,8 +118,8 @@ private[spark] class StagePage(parent: JobProgressUI) { val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) val listings: Seq[Seq[String]] = Seq(serviceQuantiles, - if (shuffleRead) shuffleReadQuantiles else Nil, - if (shuffleWrite) shuffleWriteQuantiles else Nil) + if (hasShuffleRead) shuffleReadQuantiles else Nil, + if (hasShuffleWrite) shuffleWriteQuantiles else Nil) val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr> diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala index 1df0e0913c..5068a025fa 100644 --- a/core/src/main/scala/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala @@ -25,11 +25,13 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR def toNodeSeq(): Seq[Node] = { - stageTable(stageRow, stages) + listener.synchronized { + stageTable(stageRow, stages) + } } /** Special table which merges two header cells. */ - def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + private def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { <table class="table table-bordered table-striped table-condensed sortable"> <thead> <th>Stage Id</th> @@ -47,14 +49,14 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU </table> } - def getElapsedTime(submitted: Option[Long], completed: Long): String = { + private def getElapsedTime(submitted: Option[Long], completed: Long): String = { submitted match { case Some(t) => parent.formatDuration(completed - t) case _ => "Unknown" } } - def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = { + private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) val startWidth = "width: %s%%".format((started.toDouble/total)*100) @@ -68,7 +70,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU } - def stageRow(s: Stage): Seq[Node] = { + private def stageRow(s: Stage): Seq[Node] = { val submissionTime = s.submissionTime match { case Some(t) => dateFmt.format(new Date(t)) case None => "Unknown" |