diff options
author | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-07 14:28:00 -0700 |
---|---|---|
committer | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-07 14:42:05 -0700 |
commit | b88e26248e0e7f0308a14e870677da9ce16a8735 (patch) | |
tree | 63c1f4dbbb82495faaa3ce1dbc2fe594451d6aa1 | |
parent | 3c8478e1fbe3ec85a83f2822ad8a8d4cca580487 (diff) | |
download | spark-b88e26248e0e7f0308a14e870677da9ce16a8735.tar.gz spark-b88e26248e0e7f0308a14e870677da9ce16a8735.tar.bz2 spark-b88e26248e0e7f0308a14e870677da9ce16a8735.zip |
Fixed issue in UI that limited scheduler throughput.
Removal of items from ArrayBuffers in the UI code was slow and
significantly impacted scheduler throughput. This commit
improves scheduler throughput by 5x.
-rw-r--r-- | core/src/main/scala/spark/ui/exec/ExecutorsUI.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressListener.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StagePage.scala | 2 |
3 files changed, 6 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala index 4be2bfa413..6ec48f70a4 100644 --- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -97,7 +97,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { .getOrElse(0).toString val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString - val totalTasks = listener.executorToTaskInfos(a.toString).size.toString + val totalTasks = activeTasks + failedTasks + completedTasks Seq( execId, @@ -117,17 +117,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() - val executorToTaskInfos = - HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) activeTasks += taskStart.taskInfo - val taskList = executorToTaskInfos.getOrElse( - eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList += ((taskStart.taskInfo, None, None)) - executorToTaskInfos(eid) = taskList } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { @@ -143,11 +137,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 (None, Option(taskEnd.taskMetrics)) } - val taskList = executorToTaskInfos.getOrElse( - eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList -= ((taskEnd.taskInfo, None, None)) - taskList += ((taskEnd.taskInfo, metrics, failureInfo)) - executorToTaskInfos(eid) = taskList } } } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala index f22c4e39e3..c6103edcb0 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -1,7 +1,7 @@ package spark.ui.jobs import scala.Seq -import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} +import scala.collection.mutable.{ListBuffer, HashMap, HashSet} import spark.{ExceptionFailure, SparkContext, Success, Utils} import spark.scheduler._ @@ -34,7 +34,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageToTasksComplete = HashMap[Int, Int]() val stageToTasksFailed = HashMap[Int, Int]() val stageToTaskInfos = - HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -89,7 +89,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive += taskStart.taskInfo val taskList = stageToTaskInfos.getOrElse( - sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) stageToTaskInfos(sid) = taskList } @@ -126,7 +126,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList totalShuffleWrite += shuffleWrite val taskList = stageToTaskInfos.getOrElse( - sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList -= ((taskEnd.taskInfo, None, None)) taskList += ((taskEnd.taskInfo, metrics, failureInfo)) stageToTaskInfos(sid) = taskList diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index e327cb3947..54b0393e21 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -48,7 +48,7 @@ private[spark] class StagePage(parent: JobProgressUI) { return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) } - val tasks = listener.stageToTaskInfos(stageId) + val tasks = listener.stageToTaskInfos(stageId).toSeq val shuffleRead = listener.stageToShuffleRead(stageId) > 0 val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0 |