aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-07-06 16:36:13 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-07-06 16:40:02 -0700
commit32b9d21a97d1c93f174551000d06cc429f317827 (patch)
tree49ab57b7d33e4fa01fdf92290510d627ca45ced8 /core
parent22161887ee3eb3182cb35911f9da13bceea5fa0c (diff)
downloadspark-32b9d21a97d1c93f174551000d06cc429f317827.tar.gz
spark-32b9d21a97d1c93f174551000d06cc429f317827.tar.bz2
spark-32b9d21a97d1c93f174551000d06cc429f317827.zip
Fix occasional failure in UI listener.
If a task fails before the metrics are initialized, it remains possible that the metrics field will be `None`. This patch accounts for that possbility by keeping metrics as an `Option` at all times.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala14
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala16
2 files changed, 15 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 36b1cd00ed..84730cc091 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -51,7 +51,7 @@ private[spark] class JobProgressListener extends SparkListener {
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
- HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]]()
+ HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@@ -78,17 +78,17 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
- val (failureInfo, metrics): (Option[ExceptionFailure], TaskMetrics) =
+ val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
- (Some(e), e.metrics.get)
+ (Some(e), e.metrics)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- (None, taskEnd.taskMetrics)
+ (None, Some(taskEnd.taskMetrics))
}
val taskList = stageToTaskInfos.getOrElse(
- sid, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]())
+ sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
@@ -111,7 +111,7 @@ private[spark] class JobProgressListener extends SparkListener {
def hasShuffleRead(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.shuffleReadMetrics.isDefined
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
}
return false // No tasks have finished for this stage
}
@@ -120,7 +120,7 @@ private[spark] class JobProgressListener extends SparkListener {
def hasShuffleWrite(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.shuffleWriteMetrics.isDefined
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
}
return false // No tasks have finished for this stage
}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 49e84880cf..51b82b6a8c 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -52,7 +52,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
else {
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
- metrics.executorRunTime.toDouble}
+ metrics.get.executorRunTime.toDouble}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
@@ -61,13 +61,13 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleReadSizes = validTasks.map {
case(info, metrics, exception) =>
- metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
val shuffleWriteSizes = validTasks.map {
case(info, metrics, exception) =>
- metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
+ metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
@@ -87,21 +87,21 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
- def taskRow(taskData: (TaskInfo, TaskMetrics, Option[ExceptionFailure])): Seq[Node] = {
+ def taskRow(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData
<tr>
<td>{info.taskId}</td>
- <td sorttable_customkey={Option(metrics).map{m => m.executorRunTime.toString}.getOrElse("1")}>
- {Option(metrics).map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}
+ <td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}>
+ {metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}
</td>
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td>
- {Option(metrics).flatMap{m => m.shuffleReadMetrics}.map{s =>
+ {metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
<td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")}
- {Option(metrics).flatMap{m => m.shuffleWriteMetrics}.map{s =>
+ {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
<td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")}
<td>{exception.map(e =>
<span>