aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 9ac33326c0..bb2b003486 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -94,7 +94,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
- val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, 0).toString
+ val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, Seq[Long]()).size.toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
@@ -114,7 +114,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
- val executorToTasksActive = HashMap[String, Int]()
+ val executorToTasksActive = HashMap[String, Seq[Long]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
@@ -122,12 +122,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()) :+ taskStart.taskInfo.taskId
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) - 1
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()).filterNot(_ == taskEnd.taskInfo.taskId)
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>