aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayo@yahoo-inc.com>2013-08-07 14:28:00 -0700
committerKay Ousterhout <kayo@yahoo-inc.com>2013-08-07 14:42:05 -0700
commitb88e26248e0e7f0308a14e870677da9ce16a8735 (patch)
tree63c1f4dbbb82495faaa3ce1dbc2fe594451d6aa1 /core
parent3c8478e1fbe3ec85a83f2822ad8a8d4cca580487 (diff)
downloadspark-b88e26248e0e7f0308a14e870677da9ce16a8735.tar.gz
spark-b88e26248e0e7f0308a14e870677da9ce16a8735.tar.bz2
spark-b88e26248e0e7f0308a14e870677da9ce16a8735.zip
Fixed issue in UI that limited scheduler throughput.
Removal of items from ArrayBuffers in the UI code was slow and significantly impacted scheduler throughput. This commit improves scheduler throughput by 5x.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala13
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala8
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala2
3 files changed, 6 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 4be2bfa413..6ec48f70a4 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -97,7 +97,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
.getOrElse(0).toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
- val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
+ val totalTasks = activeTasks + failedTasks + completedTasks
Seq(
execId,
@@ -117,17 +117,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
- val executorToTaskInfos =
- HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
activeTasks += taskStart.taskInfo
- val taskList = executorToTaskInfos.getOrElse(
- eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList += ((taskStart.taskInfo, None, None))
- executorToTaskInfos(eid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@@ -143,11 +137,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
- val taskList = executorToTaskInfos.getOrElse(
- eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList -= ((taskEnd.taskInfo, None, None))
- taskList += ((taskEnd.taskInfo, metrics, failureInfo))
- executorToTaskInfos(eid) = taskList
}
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
index f22c4e39e3..c6103edcb0 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -1,7 +1,7 @@
package spark.ui.jobs
import scala.Seq
-import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
+import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._
@@ -34,7 +34,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
- HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@@ -89,7 +89,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive += taskStart.taskInfo
val taskList = stageToTaskInfos.getOrElse(
- sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
stageToTaskInfos(sid) = taskList
}
@@ -126,7 +126,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse(
- sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index e327cb3947..54b0393e21 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -48,7 +48,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
- val tasks = listener.stageToTaskInfos(stageId)
+ val tasks = listener.stageToTaskInfos(stageId).toSeq
val shuffleRead = listener.stageToShuffleRead(stageId) > 0
val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0