diff options
author | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-07 22:16:21 -0700 |
---|---|---|
committer | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-07 23:09:25 -0700 |
commit | 88049a214df8ee0b155e5c4b894cf32bab7bafc5 (patch) | |
tree | 6408fe07f62a14bf8cfaae410b67fce3658214a3 /core | |
parent | 5133e4bebd47d8ae089f967689ecab551c2c5844 (diff) | |
download | spark-88049a214df8ee0b155e5c4b894cf32bab7bafc5.tar.gz spark-88049a214df8ee0b155e5c4b894cf32bab7bafc5.tar.bz2 spark-88049a214df8ee0b155e5c4b894cf32bab7bafc5.zip |
Fixed 3 bugs that caused UI to crash (including SPARK-810).
One bug caused the UI to crash if you try to look at a job's status
before any of the tasks have finished.
The second bug was a concurrency issue where two different threads
(the scheduling thread and a UI thread) could be reading/updating
the data structures in JobProgressListener concurrently.
The third bug mis-used an Option, also causing the UI to crash
under certain conditions.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressListener.scala | 204 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StagePage.scala | 156 |
2 files changed, 191 insertions, 169 deletions
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala index c6103edcb0..ee6468dc9a 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -9,6 +9,12 @@ import spark.scheduler.cluster.TaskInfo import spark.executor.TaskMetrics import collection.mutable +/** Tracks task-level information to be displayed in the UI. + * + * All access to the data structures in this class must be synchronized on the + * class, since the UI thread and the DAGScheduler event loop may otherwise + * be reading/updating the internal data structures concurrently. + */ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener { // How many stages to remember val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt @@ -39,129 +45,145 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onJobStart(jobStart: SparkListenerJobStart) {} override def onStageCompleted(stageCompleted: StageCompleted) = { - val stage = stageCompleted.stageInfo.stage - poolToActiveStages(stageToPool(stage)) -= stage - activeStages -= stage - completedStages += stage - trimIfNecessary(completedStages) + this.synchronized { + val stage = stageCompleted.stageInfo.stage + poolToActiveStages(stageToPool(stage)) -= stage + activeStages -= stage + completedStages += stage + trimIfNecessary(completedStages) + } } /** If stages is too large, remove and garbage collect old stages */ def trimIfNecessary(stages: ListBuffer[Stage]) { - if (stages.size > RETAINED_STAGES) { - val toRemove = RETAINED_STAGES / 10 - stages.takeRight(toRemove).foreach( s => { - stageToTaskInfos.remove(s.id) - stageToTime.remove(s.id) - stageToShuffleRead.remove(s.id) - stageToShuffleWrite.remove(s.id) - stageToTasksActive.remove(s.id) - stageToTasksComplete.remove(s.id) - stageToTasksFailed.remove(s.id) - stageToPool.remove(s) - if (stageToDescription.contains(s)) {stageToDescription.remove(s)} - }) - stages.trimEnd(toRemove) + this.synchronized { + if (stages.size > RETAINED_STAGES) { + val toRemove = RETAINED_STAGES / 10 + stages.takeRight(toRemove).foreach( s => { + stageToTaskInfos.remove(s.id) + stageToTime.remove(s.id) + stageToShuffleRead.remove(s.id) + stageToShuffleWrite.remove(s.id) + stageToTasksActive.remove(s.id) + stageToTasksComplete.remove(s.id) + stageToTasksFailed.remove(s.id) + stageToPool.remove(s) + if (stageToDescription.contains(s)) {stageToDescription.remove(s)} + }) + stages.trimEnd(toRemove) + } } } /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { - val stage = stageSubmitted.stage - activeStages += stage + this.synchronized { + val stage = stageSubmitted.stage + activeStages += stage - val poolName = Option(stageSubmitted.properties).map { - p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) - }.getOrElse(DEFAULT_POOL_NAME) - stageToPool(stage) = poolName + val poolName = Option(stageSubmitted.properties).map { + p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) + }.getOrElse(DEFAULT_POOL_NAME) + stageToPool(stage) = poolName - val description = Option(stageSubmitted.properties).flatMap { - p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) - } - description.map(d => stageToDescription(stage) = d) + val description = Option(stageSubmitted.properties).flatMap { + p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) + } + description.map(d => stageToDescription(stage) = d) - val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]()) - stages += stage + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]()) + stages += stage + } } override def onTaskStart(taskStart: SparkListenerTaskStart) { - val sid = taskStart.task.stageId - val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) - tasksActive += taskStart.taskInfo - val taskList = stageToTaskInfos.getOrElse( - sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList += ((taskStart.taskInfo, None, None)) - stageToTaskInfos(sid) = taskList + this.synchronized { + val sid = taskStart.task.stageId + val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + tasksActive += taskStart.taskInfo + val taskList = stageToTaskInfos.getOrElse( + sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList += ((taskStart.taskInfo, None, None)) + stageToTaskInfos(sid) = taskList + } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - val sid = taskEnd.task.stageId - val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) - tasksActive -= taskEnd.taskInfo - val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = - taskEnd.reason match { - case e: ExceptionFailure => - stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 - (Some(e), e.metrics) - case _ => - stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 - (None, Option(taskEnd.taskMetrics)) - } + this.synchronized { + val sid = taskEnd.task.stageId + val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + tasksActive -= taskEnd.taskInfo + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = + taskEnd.reason match { + case e: ExceptionFailure => + stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 + (Some(e), e.metrics) + case _ => + stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 + (None, Option(taskEnd.taskMetrics)) + } - stageToTime.getOrElseUpdate(sid, 0L) - val time = metrics.map(m => m.executorRunTime).getOrElse(0) - stageToTime(sid) += time - totalTime += time - - stageToShuffleRead.getOrElseUpdate(sid, 0L) - val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => - s.remoteBytesRead).getOrElse(0L) - stageToShuffleRead(sid) += shuffleRead - totalShuffleRead += shuffleRead - - stageToShuffleWrite.getOrElseUpdate(sid, 0L) - val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => - s.shuffleBytesWritten).getOrElse(0L) - stageToShuffleWrite(sid) += shuffleWrite - totalShuffleWrite += shuffleWrite - - val taskList = stageToTaskInfos.getOrElse( - sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList -= ((taskEnd.taskInfo, None, None)) - taskList += ((taskEnd.taskInfo, metrics, failureInfo)) - stageToTaskInfos(sid) = taskList + stageToTime.getOrElseUpdate(sid, 0L) + val time = metrics.map(m => m.executorRunTime).getOrElse(0) + stageToTime(sid) += time + totalTime += time + + stageToShuffleRead.getOrElseUpdate(sid, 0L) + val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => + s.remoteBytesRead).getOrElse(0L) + stageToShuffleRead(sid) += shuffleRead + totalShuffleRead += shuffleRead + + stageToShuffleWrite.getOrElseUpdate(sid, 0L) + val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => + s.shuffleBytesWritten).getOrElse(0L) + stageToShuffleWrite(sid) += shuffleWrite + totalShuffleWrite += shuffleWrite + + val taskList = stageToTaskInfos.getOrElse( + sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList -= ((taskEnd.taskInfo, None, None)) + taskList += ((taskEnd.taskInfo, metrics, failureInfo)) + stageToTaskInfos(sid) = taskList + } } override def onJobEnd(jobEnd: SparkListenerJobEnd) { - jobEnd match { - case end: SparkListenerJobEnd => - end.jobResult match { - case JobFailed(ex, Some(stage)) => - activeStages -= stage - poolToActiveStages(stageToPool(stage)) -= stage - failedStages += stage - trimIfNecessary(failedStages) - case _ => - } - case _ => + this.synchronized { + jobEnd match { + case end: SparkListenerJobEnd => + end.jobResult match { + case JobFailed(ex, Some(stage)) => + activeStages -= stage + poolToActiveStages(stageToPool(stage)) -= stage + failedStages += stage + trimIfNecessary(failedStages) + case _ => + } + case _ => + } } } /** Is this stage's input from a shuffle read. */ def hasShuffleRead(stageID: Int): Boolean = { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined + this.synchronized { + // This is written in a slightly complicated way to avoid having to scan all tasks + for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { + if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined + } + return false // No tasks have finished for this stage } - return false // No tasks have finished for this stage } /** Is this stage's output to a shuffle write. */ def hasShuffleWrite(stageID: Int): Boolean = { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined + this.synchronized { + // This is written in a slightly complicated way to avoid having to scan all tasks + for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { + if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined + } + return false // No tasks have finished for this stage } - return false // No tasks have finished for this stage } } diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 54b0393e21..f6cef6694d 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -36,100 +36,100 @@ private[spark] class StagePage(parent: JobProgressUI) { val dateFmt = parent.dateFmt def render(request: HttpServletRequest): Seq[Node] = { - val stageId = request.getParameter("id").toInt - val now = System.currentTimeMillis() + listener.synchronized { + val stageId = request.getParameter("id").toInt + val now = System.currentTimeMillis() + + if (!listener.stageToTaskInfos.contains(stageId)) { + val content = + <div> + <h2>Summary Metrics</h2> No tasks have started yet + <h2>Tasks</h2> No tasks have started yet + </div> + return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) + } - if (!listener.stageToTaskInfos.contains(stageId)) { - val content = - <div> - <h2>Summary Metrics</h2> No tasks have started yet - <h2>Tasks</h2> No tasks have started yet - </div> - return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) - } + val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime) - val tasks = listener.stageToTaskInfos(stageId).toSeq + val shuffleRead = listener.stageToShuffleRead.getOrElse(stageId, 0L) > 0 + val shuffleWrite = listener.stageToShuffleWrite.getOrElse(stageId, 0L) > 0 - val shuffleRead = listener.stageToShuffleRead(stageId) > 0 - val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0 + var activeTime = 0L + listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) - var activeTime = 0L - listener.stageToTasksActive(stageId).foreach { t => - activeTime += t.timeRunning(now) - } - - val summary = - <div> - <ul class="unstyled"> - <li> - <strong>CPU time: </strong> - {parent.formatDuration(listener.stageToTime(stageId) + activeTime)} - </li> - {if (shuffleRead) - <li> - <strong>Shuffle read: </strong> - {Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))} - </li> - } - {if (shuffleWrite) + val summary = + <div> + <ul class="unstyled"> <li> - <strong>Shuffle write: </strong> - {Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))} + <strong>CPU time: </strong> + {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)} </li> - } - </ul> - </div> + {if (shuffleRead) + <li> + <strong>Shuffle read: </strong> + {Utils.memoryBytesToString(listener.stageToShuffleRead.getOrElse(stageId, 0L))} + </li> + } + {if (shuffleWrite) + <li> + <strong>Shuffle write: </strong> + {Utils.memoryBytesToString(listener.stageToShuffleWrite.getOrElse(stageId, 0L))} + </li> + } + </ul> + </div> - val taskHeaders: Seq[String] = - Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++ - {if (shuffleRead) Seq("Shuffle Read") else Nil} ++ - {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++ - Seq("Details") + val taskHeaders: Seq[String] = + Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++ + {if (shuffleRead) Seq("Shuffle Read") else Nil} ++ + {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++ + Seq("Details") - val taskTable = listingTable(taskHeaders, taskRow, tasks) + val taskTable = listingTable(taskHeaders, taskRow, tasks) - // Excludes tasks which failed and have incomplete metrics - val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (Option(t._2).isDefined)) + // Excludes tasks which failed and have incomplete metrics + val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined)) - val summaryTable: Option[Seq[Node]] = - if (validTasks.size == 0) { - None - } - else { - val serviceTimes = validTasks.map{case (info, metrics, exception) => - metrics.get.executorRunTime.toDouble} - val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( - ms => parent.formatDuration(ms.toLong)) - - def getQuantileCols(data: Seq[Double]) = - Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong)) - - val shuffleReadSizes = validTasks.map { - case(info, metrics, exception) => - metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + val summaryTable: Option[Seq[Node]] = + if (validTasks.size == 0) { + None } - val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) + else { + val serviceTimes = validTasks.map{case (info, metrics, exception) => + metrics.get.executorRunTime.toDouble} + val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( + ms => parent.formatDuration(ms.toLong)) + + def getQuantileCols(data: Seq[Double]) = + Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong)) + + val shuffleReadSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + } + val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) - val shuffleWriteSizes = validTasks.map { - case(info, metrics, exception) => - metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble - } - val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) + val shuffleWriteSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble + } + val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) - val listings: Seq[Seq[String]] = Seq(serviceQuantiles, - if (shuffleRead) shuffleReadQuantiles else Nil, - if (shuffleWrite) shuffleWriteQuantiles else Nil) + val listings: Seq[Seq[String]] = Seq(serviceQuantiles, + if (shuffleRead) shuffleReadQuantiles else Nil, + if (shuffleWrite) shuffleWriteQuantiles else Nil) - val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") - def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr> - Some(listingTable(quantileHeaders, quantileRow, listings)) - } + val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") + def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr> + Some(listingTable(quantileHeaders, quantileRow, listings)) + } - val content = - summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ - <h2>Tasks</h2> ++ taskTable; + val content = + summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ + <h2>Tasks</h2> ++ taskTable; - headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) + headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) + } } |