aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-08-07 15:50:45 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-08-07 15:50:45 -0700
commit5133e4bebd47d8ae089f967689ecab551c2c5844 (patch)
tree63c1f4dbbb82495faaa3ce1dbc2fe594451d6aa1 /core
parent3c8478e1fbe3ec85a83f2822ad8a8d4cca580487 (diff)
parentb88e26248e0e7f0308a14e870677da9ce16a8735 (diff)
downloadspark-5133e4bebd47d8ae089f967689ecab551c2c5844.tar.gz
spark-5133e4bebd47d8ae089f967689ecab551c2c5844.tar.bz2
spark-5133e4bebd47d8ae089f967689ecab551c2c5844.zip
Merge pull request #790 from kayousterhout/fix_throughput
Fixed issue in UI that decreased scheduler throughput by 5x or more
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala13
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala8
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala2
3 files changed, 6 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 4be2bfa413..6ec48f70a4 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -97,7 +97,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
.getOrElse(0).toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
- val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
+ val totalTasks = activeTasks + failedTasks + completedTasks
Seq(
execId,
@@ -117,17 +117,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
- val executorToTaskInfos =
- HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
activeTasks += taskStart.taskInfo
- val taskList = executorToTaskInfos.getOrElse(
- eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList += ((taskStart.taskInfo, None, None))
- executorToTaskInfos(eid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@@ -143,11 +137,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
- val taskList = executorToTaskInfos.getOrElse(
- eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList -= ((taskEnd.taskInfo, None, None))
- taskList += ((taskEnd.taskInfo, metrics, failureInfo))
- executorToTaskInfos(eid) = taskList
}
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
index f22c4e39e3..c6103edcb0 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -1,7 +1,7 @@
package spark.ui.jobs
import scala.Seq
-import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
+import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._
@@ -34,7 +34,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
- HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@@ -89,7 +89,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive += taskStart.taskInfo
val taskList = stageToTaskInfos.getOrElse(
- sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
stageToTaskInfos(sid) = taskList
}
@@ -126,7 +126,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse(
- sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index e327cb3947..54b0393e21 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -48,7 +48,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
- val tasks = listener.stageToTaskInfos(stageId)
+ val tasks = listener.stageToTaskInfos(stageId).toSeq
val shuffleRead = listener.stageToShuffleRead(stageId) > 0
val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0