aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKaren Feng <karenfeng.us@gmail.com>2013-07-23 13:35:43 -0700
committerKaren Feng <karenfeng.us@gmail.com>2013-07-23 13:35:43 -0700
commit0200801a55b580c7504687e3476b7a71c7699001 (patch)
tree723b778a1d1de7d86f50fb15c4c913093bed6bd1 /core
parent401aac8b189aa6b72ad020ba894ca57b948c53a1 (diff)
downloadspark-0200801a55b580c7504687e3476b7a71c7699001.tar.gz
spark-0200801a55b580c7504687e3476b7a71c7699001.tar.bz2
spark-0200801a55b580c7504687e3476b7a71c7699001.zip
Tracks task start events and shows number of active tasks on Executor UI
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala5
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala12
8 files changed, 58 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 29e879aa42..b02bf8f4bf 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -52,6 +52,11 @@ class DAGScheduler(
}
taskSched.setListener(this)
+ //Called by TaskScheduler to report task's starting.
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+ eventQueue.put(BeginEvent(task, taskInfo))
+ }
+
// Called by TaskScheduler to report task completions or failures.
override def taskEnded(
task: Task[_],
@@ -343,6 +348,9 @@ class DAGScheduler(
case ExecutorLost(execId) =>
handleExecutorLost(execId)
+ case begin: BeginEvent =>
+ sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo)))
+
case completion: CompletionEvent =>
sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
completion.reason, completion.taskInfo, completion.taskMetrics)))
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 506c87f65b..3b4ee6287a 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -43,6 +43,8 @@ private[spark] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent
+private[spark] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+
private[spark] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 85b5ddd4a8..f7565b8c57 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -68,6 +68,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
processStageCompletedEvent(stageInfo)
case SparkListenerJobEnd(job, result) =>
processJobEndEvent(job, result)
+ case SparkListenerTaskStart(task, taskInfo) =>
+ processTaskStartEvent(task, taskInfo)
case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
processTaskEndEvent(task, reason, taskInfo, taskMetrics)
case _ =>
@@ -252,7 +254,19 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageInfo.stage.id + " STATUS=COMPLETED")
}
-
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ eventQueue.put(taskStart)
+ }
+
+ protected def processTaskStartEvent(task: Task[_], taskInfo: TaskInfo) {
+ var taskStatus = ""
+ task match {
+ case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
+ case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
+ }
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
eventQueue.put(taskEnd)
}
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 4fb1c5d42d..4eb7e4e6a5 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -29,6 +29,8 @@ case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends Spar
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
+case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
@@ -48,7 +50,12 @@ trait SparkListener {
* Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
-
+
+ /**
+ * Called when a task starts
+ */
+ def onTaskStart(taskEnd: SparkListenerTaskStart) { }
+
/**
* Called when a task ends
*/
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index 245e7ccb52..2cdeb1c8c0 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -27,6 +27,9 @@ import spark.executor.TaskMetrics
* Interface for getting events back from the TaskScheduler.
*/
private[spark] trait TaskSchedulerListener {
+ // A task has started.
+ def taskStarted(task: Task[_], taskInfo: TaskInfo)
+
// A task has finished or failed.
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 3d06520675..14e87af653 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -496,6 +496,8 @@ private[spark] class ClusterTaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
+ if (taskAttempts(index).size == 1)
+ taskStarted(task,info)
return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
@@ -518,6 +520,10 @@ private[spark] class ClusterTaskSetManager(
}
}
+ def taskStarted(task: Task[_], info: TaskInfo) {
+ sched.listener.taskStarted(task, info)
+ }
+
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
if (info.failed) {
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index e662ad6709..b500451990 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -117,6 +117,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val taskName = "task %s:%d".format(taskSet.id, index)
copiesRunning(index) += 1
increaseRunningTasks(1)
+ taskStarted(task, info)
return Some(new TaskDescription(taskId, null, taskName, bytes))
case None => {}
}
@@ -146,6 +147,10 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
}
+ def taskStarted(task: Task[_], info: TaskInfo) {
+ sched.listener.taskStarted(task, info)
+ }
+
def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
val index = info.index
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 20ea54d6a6..9ac33326c0 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -45,7 +45,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
.reduceOption(_+_).getOrElse(0L)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Failed tasks", "Complete tasks", "Total tasks")
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
def execRow(kv: Seq[String]) =
<tr>
<td>{kv(0)}</td>
@@ -60,6 +60,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(6)}</td>
<td>{kv(7)}</td>
<td>{kv(8)}</td>
+ <td>{kv(9)}</td>
</tr>
val execInfo =
for (b <- 0 until storageStatusList.size)
@@ -93,6 +94,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
+ val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, 0).toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
@@ -104,6 +106,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
memUsed,
maxMem,
diskUsed,
+ activeTasks,
failedTasks,
completedTasks,
totalTasks
@@ -111,13 +114,20 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
+ val executorToTasksActive = HashMap[String, Int]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ val eid = taskStart.taskInfo.executorId
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) - 1
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>