From 0200801a55b580c7504687e3476b7a71c7699001 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 23 Jul 2013 13:35:43 -0700 Subject: Tracks task start events and shows number of active tasks on Executor UI --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 8 ++++++++ .../main/scala/spark/scheduler/DAGSchedulerEvent.scala | 2 ++ core/src/main/scala/spark/scheduler/JobLogger.scala | 16 +++++++++++++++- core/src/main/scala/spark/scheduler/SparkListener.scala | 9 ++++++++- .../scala/spark/scheduler/TaskSchedulerListener.scala | 3 +++ .../spark/scheduler/cluster/ClusterTaskSetManager.scala | 6 ++++++ .../spark/scheduler/local/LocalTaskSetManager.scala | 5 +++++ core/src/main/scala/spark/ui/exec/ExecutorsUI.scala | 12 +++++++++++- 8 files changed, 58 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 29e879aa42..b02bf8f4bf 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -52,6 +52,11 @@ class DAGScheduler( } taskSched.setListener(this) + //Called by TaskScheduler to report task's starting. + override def taskStarted(task: Task[_], taskInfo: TaskInfo) { + eventQueue.put(BeginEvent(task, taskInfo)) + } + // Called by TaskScheduler to report task completions or failures. override def taskEnded( task: Task[_], @@ -343,6 +348,9 @@ class DAGScheduler( case ExecutorLost(execId) => handleExecutorLost(execId) + case begin: BeginEvent => + sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo))) + case completion: CompletionEvent => sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task, completion.reason, completion.taskInfo, completion.taskMetrics))) diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala index 506c87f65b..3b4ee6287a 100644 --- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala @@ -43,6 +43,8 @@ private[spark] case class JobSubmitted( properties: Properties = null) extends DAGSchedulerEvent +private[spark] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent + private[spark] case class CompletionEvent( task: Task[_], reason: TaskEndReason, diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 85b5ddd4a8..f7565b8c57 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -68,6 +68,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { processStageCompletedEvent(stageInfo) case SparkListenerJobEnd(job, result) => processJobEndEvent(job, result) + case SparkListenerTaskStart(task, taskInfo) => + processTaskStartEvent(task, taskInfo) case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) => processTaskEndEvent(task, reason, taskInfo, taskMetrics) case _ => @@ -252,7 +254,19 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { stageInfo.stage.id + " STATUS=COMPLETED") } - + + override def onTaskStart(taskStart: SparkListenerTaskStart) { + eventQueue.put(taskStart) + } + + protected def processTaskStartEvent(task: Task[_], taskInfo: TaskInfo) { + var taskStatus = "" + task match { + case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK" + case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK" + } + } + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { eventQueue.put(taskEnd) } diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 4fb1c5d42d..4eb7e4e6a5 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -29,6 +29,8 @@ case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends Spar case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents +case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents + case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo, taskMetrics: TaskMetrics) extends SparkListenerEvents @@ -48,7 +50,12 @@ trait SparkListener { * Called when a stage is submitted */ def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { } - + + /** + * Called when a task starts + */ + def onTaskStart(taskEnd: SparkListenerTaskStart) { } + /** * Called when a task ends */ diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala index 245e7ccb52..2cdeb1c8c0 100644 --- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala +++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala @@ -27,6 +27,9 @@ import spark.executor.TaskMetrics * Interface for getting events back from the TaskScheduler. */ private[spark] trait TaskSchedulerListener { + // A task has started. + def taskStarted(task: Task[_], taskInfo: TaskInfo) + // A task has finished or failed. def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index 3d06520675..14e87af653 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -496,6 +496,8 @@ private[spark] class ClusterTaskSetManager( logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) val taskName = "task %s:%d".format(taskSet.id, index) + if (taskAttempts(index).size == 1) + taskStarted(task,info) return Some(new TaskDescription(taskId, execId, taskName, serializedTask)) } case _ => @@ -518,6 +520,10 @@ private[spark] class ClusterTaskSetManager( } } + def taskStarted(task: Task[_], info: TaskInfo) { + sched.listener.taskStarted(task, info) + } + def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) { val info = taskInfos(tid) if (info.failed) { diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala index e662ad6709..b500451990 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala @@ -117,6 +117,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val taskName = "task %s:%d".format(taskSet.id, index) copiesRunning(index) += 1 increaseRunningTasks(1) + taskStarted(task, info) return Some(new TaskDescription(taskId, null, taskName, bytes)) case None => {} } @@ -146,6 +147,10 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas } } + def taskStarted(task: Task[_], info: TaskInfo) { + sched.listener.taskStarted(task, info) + } + def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) { val info = taskInfos(tid) val index = info.index diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala index 20ea54d6a6..9ac33326c0 100644 --- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -45,7 +45,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { .reduceOption(_+_).getOrElse(0L) val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used", - "Failed tasks", "Complete tasks", "Total tasks") + "Active tasks", "Failed tasks", "Complete tasks", "Total tasks") def execRow(kv: Seq[String]) = {kv(0)} @@ -60,6 +60,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { {kv(6)} {kv(7)} {kv(8)} + {kv(9)} val execInfo = for (b <- 0 until storageStatusList.size) @@ -93,6 +94,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString + val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, 0).toString val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString val totalTasks = listener.executorToTaskInfos(a.toString).size.toString @@ -104,6 +106,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { memUsed, maxMem, diskUsed, + activeTasks, failedTasks, completedTasks, totalTasks @@ -111,13 +114,20 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } private[spark] class ExecutorsListener extends SparkListener with Logging { + val executorToTasksActive = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToTaskInfos = HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + override def onTaskStart(taskStart: SparkListenerTaskStart) { + val eid = taskStart.taskInfo.executorId + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 + } + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) - 1 val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => -- cgit v1.2.3 From 9f2dbb2a7ca9c1cf878cf96df7122b05d95e799b Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 23 Jul 2013 15:10:09 -0700 Subject: Adds/removes active tasks only once --- core/src/main/scala/spark/ui/exec/ExecutorsUI.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala index 9ac33326c0..bb2b003486 100644 --- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -94,7 +94,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString - val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, 0).toString + val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, Seq[Long]()).size.toString val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString val totalTasks = listener.executorToTaskInfos(a.toString).size.toString @@ -114,7 +114,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } private[spark] class ExecutorsListener extends SparkListener with Logging { - val executorToTasksActive = HashMap[String, Int]() + val executorToTasksActive = HashMap[String, Seq[Long]]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToTaskInfos = @@ -122,12 +122,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()) :+ taskStart.taskInfo.taskId } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) - 1 + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()).filterNot(_ == taskEnd.taskInfo.taskId) val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => -- cgit v1.2.3 From 383684daaa62bebc177b4c74573ce509f154f74b Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 23 Jul 2013 15:33:27 -0700 Subject: Replaces Seq with HashSet, removes redundant import --- core/src/main/scala/spark/ui/exec/ExecutorsUI.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala index bb2b003486..606e1eb2fc 100644 --- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -5,7 +5,7 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.Handler -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Properties import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils} @@ -18,7 +18,6 @@ import spark.ui.JettyUtils._ import spark.ui.Page.Executors import spark.ui.UIUtils.headerSparkPage import spark.ui.UIUtils -import spark.Utils import scala.xml.{Node, XML} @@ -114,7 +113,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } private[spark] class ExecutorsListener extends SparkListener with Logging { - val executorToTasksActive = HashMap[String, Seq[Long]]() + val executorToTasksActive = HashMap[String, HashSet[Long]]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToTaskInfos = @@ -122,12 +121,14 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()) :+ taskStart.taskInfo.taskId + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, HashSet[Long]()) + + taskStart.taskInfo.taskId } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()).filterNot(_ == taskEnd.taskInfo.taskId) + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, HashSet[Long]()) - + taskEnd.taskInfo.taskId val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => -- cgit v1.2.3 From abc78cd3318fb7bc69d10fd5422d20b299a8d7d8 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 23 Jul 2013 15:47:16 -0700 Subject: Modifies instead of copies HashSets, fixes comment style --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 2 +- core/src/main/scala/spark/ui/exec/ExecutorsUI.scala | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index b02bf8f4bf..7bf50de660 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -52,7 +52,7 @@ class DAGScheduler( } taskSched.setListener(this) - //Called by TaskScheduler to report task's starting. + // Called by TaskScheduler to report task's starting. override def taskStarted(task: Task[_], taskInfo: TaskInfo) { eventQueue.put(BeginEvent(task, taskInfo)) } diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala index 606e1eb2fc..69fb306074 100644 --- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -121,14 +121,16 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, HashSet[Long]()) + - taskStart.taskInfo.taskId + if (!executorToTasksActive.contains(eid)) + executorToTasksActive(eid) = HashSet[Long]() + executorToTasksActive(eid) += taskStart.taskInfo.taskId } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, HashSet[Long]()) - - taskEnd.taskInfo.taskId + if (!executorToTasksActive.contains(eid)) + executorToTasksActive(eid) = HashSet[Long]() + executorToTasksActive(eid) -= taskStart.taskInfo.taskId val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => -- cgit v1.2.3