aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKaren Feng <karenfeng.us@gmail.com>2013-07-23 15:10:09 -0700
committerKaren Feng <karenfeng.us@gmail.com>2013-07-23 15:10:09 -0700
commit9f2dbb2a7ca9c1cf878cf96df7122b05d95e799b (patch)
tree69c2331774a8636b4137243735df808b26c7d5f1 /core
parent0200801a55b580c7504687e3476b7a71c7699001 (diff)
downloadspark-9f2dbb2a7ca9c1cf878cf96df7122b05d95e799b.tar.gz
spark-9f2dbb2a7ca9c1cf878cf96df7122b05d95e799b.tar.bz2
spark-9f2dbb2a7ca9c1cf878cf96df7122b05d95e799b.zip
Adds/removes active tasks only once
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 9ac33326c0..bb2b003486 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -94,7 +94,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
- val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, 0).toString
+ val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, Seq[Long]()).size.toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
@@ -114,7 +114,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
- val executorToTasksActive = HashMap[String, Int]()
+ val executorToTasksActive = HashMap[String, Seq[Long]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
@@ -122,12 +122,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()) :+ taskStart.taskInfo.taskId
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) - 1
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()).filterNot(_ == taskEnd.taskInfo.taskId)
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>