aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayo@yahoo-inc.com>2013-08-07 22:16:21 -0700
committerKay Ousterhout <kayo@yahoo-inc.com>2013-08-07 23:09:25 -0700
commit88049a214df8ee0b155e5c4b894cf32bab7bafc5 (patch)
tree6408fe07f62a14bf8cfaae410b67fce3658214a3 /core
parent5133e4bebd47d8ae089f967689ecab551c2c5844 (diff)
downloadspark-88049a214df8ee0b155e5c4b894cf32bab7bafc5.tar.gz
spark-88049a214df8ee0b155e5c4b894cf32bab7bafc5.tar.bz2
spark-88049a214df8ee0b155e5c4b894cf32bab7bafc5.zip
Fixed 3 bugs that caused UI to crash (including SPARK-810).
One bug caused the UI to crash if you try to look at a job's status before any of the tasks have finished. The second bug was a concurrency issue where two different threads (the scheduling thread and a UI thread) could be reading/updating the data structures in JobProgressListener concurrently. The third bug mis-used an Option, also causing the UI to crash under certain conditions.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala204
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala156
2 files changed, 191 insertions, 169 deletions
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
index c6103edcb0..ee6468dc9a 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -9,6 +9,12 @@ import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import collection.mutable
+/** Tracks task-level information to be displayed in the UI.
+ *
+ * All access to the data structures in this class must be synchronized on the
+ * class, since the UI thread and the DAGScheduler event loop may otherwise
+ * be reading/updating the internal data structures concurrently.
+ */
private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
// How many stages to remember
val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
@@ -39,129 +45,145 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onJobStart(jobStart: SparkListenerJobStart) {}
override def onStageCompleted(stageCompleted: StageCompleted) = {
- val stage = stageCompleted.stageInfo.stage
- poolToActiveStages(stageToPool(stage)) -= stage
- activeStages -= stage
- completedStages += stage
- trimIfNecessary(completedStages)
+ this.synchronized {
+ val stage = stageCompleted.stageInfo.stage
+ poolToActiveStages(stageToPool(stage)) -= stage
+ activeStages -= stage
+ completedStages += stage
+ trimIfNecessary(completedStages)
+ }
}
/** If stages is too large, remove and garbage collect old stages */
def trimIfNecessary(stages: ListBuffer[Stage]) {
- if (stages.size > RETAINED_STAGES) {
- val toRemove = RETAINED_STAGES / 10
- stages.takeRight(toRemove).foreach( s => {
- stageToTaskInfos.remove(s.id)
- stageToTime.remove(s.id)
- stageToShuffleRead.remove(s.id)
- stageToShuffleWrite.remove(s.id)
- stageToTasksActive.remove(s.id)
- stageToTasksComplete.remove(s.id)
- stageToTasksFailed.remove(s.id)
- stageToPool.remove(s)
- if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
- })
- stages.trimEnd(toRemove)
+ this.synchronized {
+ if (stages.size > RETAINED_STAGES) {
+ val toRemove = RETAINED_STAGES / 10
+ stages.takeRight(toRemove).foreach( s => {
+ stageToTaskInfos.remove(s.id)
+ stageToTime.remove(s.id)
+ stageToShuffleRead.remove(s.id)
+ stageToShuffleWrite.remove(s.id)
+ stageToTasksActive.remove(s.id)
+ stageToTasksComplete.remove(s.id)
+ stageToTasksFailed.remove(s.id)
+ stageToPool.remove(s)
+ if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
+ })
+ stages.trimEnd(toRemove)
+ }
}
}
/** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
- val stage = stageSubmitted.stage
- activeStages += stage
+ this.synchronized {
+ val stage = stageSubmitted.stage
+ activeStages += stage
- val poolName = Option(stageSubmitted.properties).map {
- p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
- }.getOrElse(DEFAULT_POOL_NAME)
- stageToPool(stage) = poolName
+ val poolName = Option(stageSubmitted.properties).map {
+ p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+ }.getOrElse(DEFAULT_POOL_NAME)
+ stageToPool(stage) = poolName
- val description = Option(stageSubmitted.properties).flatMap {
- p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
- }
- description.map(d => stageToDescription(stage) = d)
+ val description = Option(stageSubmitted.properties).flatMap {
+ p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
+ }
+ description.map(d => stageToDescription(stage) = d)
- val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
- stages += stage
+ val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
+ stages += stage
+ }
}
override def onTaskStart(taskStart: SparkListenerTaskStart) {
- val sid = taskStart.task.stageId
- val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
- tasksActive += taskStart.taskInfo
- val taskList = stageToTaskInfos.getOrElse(
- sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList += ((taskStart.taskInfo, None, None))
- stageToTaskInfos(sid) = taskList
+ this.synchronized {
+ val sid = taskStart.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive += taskStart.taskInfo
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ stageToTaskInfos(sid) = taskList
+ }
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- val sid = taskEnd.task.stageId
- val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
- tasksActive -= taskEnd.taskInfo
- val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
- taskEnd.reason match {
- case e: ExceptionFailure =>
- stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
- (Some(e), e.metrics)
- case _ =>
- stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- (None, Option(taskEnd.taskMetrics))
- }
+ this.synchronized {
+ val sid = taskEnd.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive -= taskEnd.taskInfo
+ val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+ (Some(e), e.metrics)
+ case _ =>
+ stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+ (None, Option(taskEnd.taskMetrics))
+ }
- stageToTime.getOrElseUpdate(sid, 0L)
- val time = metrics.map(m => m.executorRunTime).getOrElse(0)
- stageToTime(sid) += time
- totalTime += time
-
- stageToShuffleRead.getOrElseUpdate(sid, 0L)
- val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
- s.remoteBytesRead).getOrElse(0L)
- stageToShuffleRead(sid) += shuffleRead
- totalShuffleRead += shuffleRead
-
- stageToShuffleWrite.getOrElseUpdate(sid, 0L)
- val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
- s.shuffleBytesWritten).getOrElse(0L)
- stageToShuffleWrite(sid) += shuffleWrite
- totalShuffleWrite += shuffleWrite
-
- val taskList = stageToTaskInfos.getOrElse(
- sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList -= ((taskEnd.taskInfo, None, None))
- taskList += ((taskEnd.taskInfo, metrics, failureInfo))
- stageToTaskInfos(sid) = taskList
+ stageToTime.getOrElseUpdate(sid, 0L)
+ val time = metrics.map(m => m.executorRunTime).getOrElse(0)
+ stageToTime(sid) += time
+ totalTime += time
+
+ stageToShuffleRead.getOrElseUpdate(sid, 0L)
+ val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
+ s.remoteBytesRead).getOrElse(0L)
+ stageToShuffleRead(sid) += shuffleRead
+ totalShuffleRead += shuffleRead
+
+ stageToShuffleWrite.getOrElseUpdate(sid, 0L)
+ val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
+ s.shuffleBytesWritten).getOrElse(0L)
+ stageToShuffleWrite(sid) += shuffleWrite
+ totalShuffleWrite += shuffleWrite
+
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
+ taskList += ((taskEnd.taskInfo, metrics, failureInfo))
+ stageToTaskInfos(sid) = taskList
+ }
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
- jobEnd match {
- case end: SparkListenerJobEnd =>
- end.jobResult match {
- case JobFailed(ex, Some(stage)) =>
- activeStages -= stage
- poolToActiveStages(stageToPool(stage)) -= stage
- failedStages += stage
- trimIfNecessary(failedStages)
- case _ =>
- }
- case _ =>
+ this.synchronized {
+ jobEnd match {
+ case end: SparkListenerJobEnd =>
+ end.jobResult match {
+ case JobFailed(ex, Some(stage)) =>
+ activeStages -= stage
+ poolToActiveStages(stageToPool(stage)) -= stage
+ failedStages += stage
+ trimIfNecessary(failedStages)
+ case _ =>
+ }
+ case _ =>
+ }
}
}
/** Is this stage's input from a shuffle read. */
def hasShuffleRead(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
+ this.synchronized {
+ // This is written in a slightly complicated way to avoid having to scan all tasks
+ for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
+ }
+ return false // No tasks have finished for this stage
}
- return false // No tasks have finished for this stage
}
/** Is this stage's output to a shuffle write. */
def hasShuffleWrite(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
+ this.synchronized {
+ // This is written in a slightly complicated way to avoid having to scan all tasks
+ for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
+ }
+ return false // No tasks have finished for this stage
}
- return false // No tasks have finished for this stage
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 54b0393e21..f6cef6694d 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -36,100 +36,100 @@ private[spark] class StagePage(parent: JobProgressUI) {
val dateFmt = parent.dateFmt
def render(request: HttpServletRequest): Seq[Node] = {
- val stageId = request.getParameter("id").toInt
- val now = System.currentTimeMillis()
+ listener.synchronized {
+ val stageId = request.getParameter("id").toInt
+ val now = System.currentTimeMillis()
+
+ if (!listener.stageToTaskInfos.contains(stageId)) {
+ val content =
+ <div>
+ <h2>Summary Metrics</h2> No tasks have started yet
+ <h2>Tasks</h2> No tasks have started yet
+ </div>
+ return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ }
- if (!listener.stageToTaskInfos.contains(stageId)) {
- val content =
- <div>
- <h2>Summary Metrics</h2> No tasks have started yet
- <h2>Tasks</h2> No tasks have started yet
- </div>
- return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
- }
+ val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime)
- val tasks = listener.stageToTaskInfos(stageId).toSeq
+ val shuffleRead = listener.stageToShuffleRead.getOrElse(stageId, 0L) > 0
+ val shuffleWrite = listener.stageToShuffleWrite.getOrElse(stageId, 0L) > 0
- val shuffleRead = listener.stageToShuffleRead(stageId) > 0
- val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0
+ var activeTime = 0L
+ listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
- var activeTime = 0L
- listener.stageToTasksActive(stageId).foreach { t =>
- activeTime += t.timeRunning(now)
- }
-
- val summary =
- <div>
- <ul class="unstyled">
- <li>
- <strong>CPU time: </strong>
- {parent.formatDuration(listener.stageToTime(stageId) + activeTime)}
- </li>
- {if (shuffleRead)
- <li>
- <strong>Shuffle read: </strong>
- {Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))}
- </li>
- }
- {if (shuffleWrite)
+ val summary =
+ <div>
+ <ul class="unstyled">
<li>
- <strong>Shuffle write: </strong>
- {Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))}
+ <strong>CPU time: </strong>
+ {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
- }
- </ul>
- </div>
+ {if (shuffleRead)
+ <li>
+ <strong>Shuffle read: </strong>
+ {Utils.memoryBytesToString(listener.stageToShuffleRead.getOrElse(stageId, 0L))}
+ </li>
+ }
+ {if (shuffleWrite)
+ <li>
+ <strong>Shuffle write: </strong>
+ {Utils.memoryBytesToString(listener.stageToShuffleWrite.getOrElse(stageId, 0L))}
+ </li>
+ }
+ </ul>
+ </div>
- val taskHeaders: Seq[String] =
- Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
- {if (shuffleRead) Seq("Shuffle Read") else Nil} ++
- {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
- Seq("Details")
+ val taskHeaders: Seq[String] =
+ Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
+ {if (shuffleRead) Seq("Shuffle Read") else Nil} ++
+ {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
+ Seq("Details")
- val taskTable = listingTable(taskHeaders, taskRow, tasks)
+ val taskTable = listingTable(taskHeaders, taskRow, tasks)
- // Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (Option(t._2).isDefined))
+ // Excludes tasks which failed and have incomplete metrics
+ val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
- val summaryTable: Option[Seq[Node]] =
- if (validTasks.size == 0) {
- None
- }
- else {
- val serviceTimes = validTasks.map{case (info, metrics, exception) =>
- metrics.get.executorRunTime.toDouble}
- val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
- ms => parent.formatDuration(ms.toLong))
-
- def getQuantileCols(data: Seq[Double]) =
- Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
-
- val shuffleReadSizes = validTasks.map {
- case(info, metrics, exception) =>
- metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ val summaryTable: Option[Seq[Node]] =
+ if (validTasks.size == 0) {
+ None
}
- val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
+ else {
+ val serviceTimes = validTasks.map{case (info, metrics, exception) =>
+ metrics.get.executorRunTime.toDouble}
+ val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
+ ms => parent.formatDuration(ms.toLong))
+
+ def getQuantileCols(data: Seq[Double]) =
+ Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
+
+ val shuffleReadSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ }
+ val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
- val shuffleWriteSizes = validTasks.map {
- case(info, metrics, exception) =>
- metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
- }
- val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
+ val shuffleWriteSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
+ }
+ val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
- val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
- if (shuffleRead) shuffleReadQuantiles else Nil,
- if (shuffleWrite) shuffleWriteQuantiles else Nil)
+ val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
+ if (shuffleRead) shuffleReadQuantiles else Nil,
+ if (shuffleWrite) shuffleWriteQuantiles else Nil)
- val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
- def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
- Some(listingTable(quantileHeaders, quantileRow, listings))
- }
+ val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
+ def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
+ Some(listingTable(quantileHeaders, quantileRow, listings))
+ }
- val content =
- summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
- <h2>Tasks</h2> ++ taskTable;
+ val content =
+ summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
+ <h2>Tasks</h2> ++ taskTable;
- headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ }
}