diff options
author | Karen Feng <karenfeng.us@gmail.com> | 2013-07-23 15:10:09 -0700 |
---|---|---|
committer | Karen Feng <karenfeng.us@gmail.com> | 2013-07-23 15:10:09 -0700 |
commit | 9f2dbb2a7ca9c1cf878cf96df7122b05d95e799b (patch) | |
tree | 69c2331774a8636b4137243735df808b26c7d5f1 | |
parent | 0200801a55b580c7504687e3476b7a71c7699001 (diff) | |
download | spark-9f2dbb2a7ca9c1cf878cf96df7122b05d95e799b.tar.gz spark-9f2dbb2a7ca9c1cf878cf96df7122b05d95e799b.tar.bz2 spark-9f2dbb2a7ca9c1cf878cf96df7122b05d95e799b.zip |
Adds/removes active tasks only once
-rw-r--r-- | core/src/main/scala/spark/ui/exec/ExecutorsUI.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala index 9ac33326c0..bb2b003486 100644 --- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -94,7 +94,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString - val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, 0).toString + val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, Seq[Long]()).size.toString val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString val totalTasks = listener.executorToTaskInfos(a.toString).size.toString @@ -114,7 +114,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } private[spark] class ExecutorsListener extends SparkListener with Logging { - val executorToTasksActive = HashMap[String, Int]() + val executorToTasksActive = HashMap[String, Seq[Long]]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToTaskInfos = @@ -122,12 +122,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()) :+ taskStart.taskInfo.taskId } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) - 1 + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()).filterNot(_ == taskEnd.taskInfo.taskId) val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => |