diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-07-06 16:36:13 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-07-06 16:40:02 -0700 |
commit | 32b9d21a97d1c93f174551000d06cc429f317827 (patch) | |
tree | 49ab57b7d33e4fa01fdf92290510d627ca45ced8 | |
parent | 22161887ee3eb3182cb35911f9da13bceea5fa0c (diff) | |
download | spark-32b9d21a97d1c93f174551000d06cc429f317827.tar.gz spark-32b9d21a97d1c93f174551000d06cc429f317827.tar.bz2 spark-32b9d21a97d1c93f174551000d06cc429f317827.zip |
Fix occasional failure in UI listener.
If a task fails before the metrics are initialized, it remains possible
that the metrics field will be `None`. This patch accounts for that possbility
by keeping metrics as an `Option` at all times.
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressUI.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StagePage.scala | 16 |
2 files changed, 15 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 36b1cd00ed..84730cc091 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -51,7 +51,7 @@ private[spark] class JobProgressListener extends SparkListener { val stageToTasksComplete = HashMap[Int, Int]() val stageToTasksFailed = HashMap[Int, Int]() val stageToTaskInfos = - HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]]() + HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -78,17 +78,17 @@ private[spark] class JobProgressListener extends SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val sid = taskEnd.task.stageId - val (failureInfo, metrics): (Option[ExceptionFailure], TaskMetrics) = + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 - (Some(e), e.metrics.get) + (Some(e), e.metrics) case _ => stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 - (None, taskEnd.taskMetrics) + (None, Some(taskEnd.taskMetrics)) } val taskList = stageToTaskInfos.getOrElse( - sid, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]()) + sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskEnd.taskInfo, metrics, failureInfo)) stageToTaskInfos(sid) = taskList } @@ -111,7 +111,7 @@ private[spark] class JobProgressListener extends SparkListener { def hasShuffleRead(stageID: Int): Boolean = { // This is written in a slightly complicated way to avoid having to scan all tasks for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.shuffleReadMetrics.isDefined + if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined } return false // No tasks have finished for this stage } @@ -120,7 +120,7 @@ private[spark] class JobProgressListener extends SparkListener { def hasShuffleWrite(stageID: Int): Boolean = { // This is written in a slightly complicated way to avoid having to scan all tasks for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.shuffleWriteMetrics.isDefined + if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined } return false // No tasks have finished for this stage } diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 49e84880cf..51b82b6a8c 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -52,7 +52,7 @@ private[spark] class StagePage(parent: JobProgressUI) { } else { val serviceTimes = validTasks.map{case (info, metrics, exception) => - metrics.executorRunTime.toDouble} + metrics.get.executorRunTime.toDouble} val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( ms => parent.formatDuration(ms.toLong)) @@ -61,13 +61,13 @@ private[spark] class StagePage(parent: JobProgressUI) { val shuffleReadSizes = validTasks.map { case(info, metrics, exception) => - metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) val shuffleWriteSizes = validTasks.map { case(info, metrics, exception) => - metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble + metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble } val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) @@ -87,21 +87,21 @@ private[spark] class StagePage(parent: JobProgressUI) { } - def taskRow(taskData: (TaskInfo, TaskMetrics, Option[ExceptionFailure])): Seq[Node] = { + def taskRow(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = { def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = trace.map(e => <span style="display:block;">{e.toString}</span>) val (info, metrics, exception) = taskData <tr> <td>{info.taskId}</td> - <td sorttable_customkey={Option(metrics).map{m => m.executorRunTime.toString}.getOrElse("1")}> - {Option(metrics).map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")} + <td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}> + {metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")} </td> <td>{info.taskLocality}</td> <td>{info.hostPort}</td> <td>{dateFmt.format(new Date(info.launchTime))}</td> - {Option(metrics).flatMap{m => m.shuffleReadMetrics}.map{s => + {metrics.flatMap{m => m.shuffleReadMetrics}.map{s => <td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")} - {Option(metrics).flatMap{m => m.shuffleWriteMetrics}.map{s => + {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => <td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")} <td>{exception.map(e => <span> |