diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-07 15:50:45 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-07 15:50:45 -0700 |
commit | 5133e4bebd47d8ae089f967689ecab551c2c5844 (patch) | |
tree | 63c1f4dbbb82495faaa3ce1dbc2fe594451d6aa1 | |
parent | 3c8478e1fbe3ec85a83f2822ad8a8d4cca580487 (diff) | |
parent | b88e26248e0e7f0308a14e870677da9ce16a8735 (diff) | |
download | spark-5133e4bebd47d8ae089f967689ecab551c2c5844.tar.gz spark-5133e4bebd47d8ae089f967689ecab551c2c5844.tar.bz2 spark-5133e4bebd47d8ae089f967689ecab551c2c5844.zip |
Merge pull request #790 from kayousterhout/fix_throughput
Fixed issue in UI that decreased scheduler throughput by 5x or more
-rw-r--r-- | core/src/main/scala/spark/ui/exec/ExecutorsUI.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressListener.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StagePage.scala | 2 |
3 files changed, 6 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala index 4be2bfa413..6ec48f70a4 100644 --- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -97,7 +97,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { .getOrElse(0).toString val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString - val totalTasks = listener.executorToTaskInfos(a.toString).size.toString + val totalTasks = activeTasks + failedTasks + completedTasks Seq( execId, @@ -117,17 +117,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() - val executorToTaskInfos = - HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) activeTasks += taskStart.taskInfo - val taskList = executorToTaskInfos.getOrElse( - eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList += ((taskStart.taskInfo, None, None)) - executorToTaskInfos(eid) = taskList } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { @@ -143,11 +137,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 (None, Option(taskEnd.taskMetrics)) } - val taskList = executorToTaskInfos.getOrElse( - eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList -= ((taskEnd.taskInfo, None, None)) - taskList += ((taskEnd.taskInfo, metrics, failureInfo)) - executorToTaskInfos(eid) = taskList } } } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala index f22c4e39e3..c6103edcb0 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -1,7 +1,7 @@ package spark.ui.jobs import scala.Seq -import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} +import scala.collection.mutable.{ListBuffer, HashMap, HashSet} import spark.{ExceptionFailure, SparkContext, Success, Utils} import spark.scheduler._ @@ -34,7 +34,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageToTasksComplete = HashMap[Int, Int]() val stageToTasksFailed = HashMap[Int, Int]() val stageToTaskInfos = - HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -89,7 +89,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive += taskStart.taskInfo val taskList = stageToTaskInfos.getOrElse( - sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) stageToTaskInfos(sid) = taskList } @@ -126,7 +126,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList totalShuffleWrite += shuffleWrite val taskList = stageToTaskInfos.getOrElse( - sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList -= ((taskEnd.taskInfo, None, None)) taskList += ((taskEnd.taskInfo, metrics, failureInfo)) stageToTaskInfos(sid) = taskList diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index e327cb3947..54b0393e21 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -48,7 +48,7 @@ private[spark] class StagePage(parent: JobProgressUI) { return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) } - val tasks = listener.stageToTaskInfos(stageId) + val tasks = listener.stageToTaskInfos(stageId).toSeq val shuffleRead = listener.stageToShuffleRead(stageId) > 0 val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0 |