aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-07-23 15:53:30 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-07-23 15:53:30 -0700
commit2f1736c3967b5ae20710cc52e84b3e4a7c041a0b (patch)
tree800c8bb1193b545aa8c37f68463261ef0a43b29d /core
parent5364f645c5bc515a74edf65ca064e4265f97243d (diff)
parentabc78cd3318fb7bc69d10fd5422d20b299a8d7d8 (diff)
downloadspark-2f1736c3967b5ae20710cc52e84b3e4a7c041a0b.tar.gz
spark-2f1736c3967b5ae20710cc52e84b3e4a7c041a0b.tar.bz2
spark-2f1736c3967b5ae20710cc52e84b3e4a7c041a0b.zip
Merge pull request #725 from karenfeng/task-start
Creates task start events
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala5
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala19
8 files changed, 63 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 29e879aa42..7bf50de660 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -52,6 +52,11 @@ class DAGScheduler(
}
taskSched.setListener(this)
+ // Called by TaskScheduler to report task's starting.
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+ eventQueue.put(BeginEvent(task, taskInfo))
+ }
+
// Called by TaskScheduler to report task completions or failures.
override def taskEnded(
task: Task[_],
@@ -343,6 +348,9 @@ class DAGScheduler(
case ExecutorLost(execId) =>
handleExecutorLost(execId)
+ case begin: BeginEvent =>
+ sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo)))
+
case completion: CompletionEvent =>
sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
completion.reason, completion.taskInfo, completion.taskMetrics)))
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 506c87f65b..3b4ee6287a 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -43,6 +43,8 @@ private[spark] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent
+private[spark] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+
private[spark] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 85b5ddd4a8..f7565b8c57 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -68,6 +68,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
processStageCompletedEvent(stageInfo)
case SparkListenerJobEnd(job, result) =>
processJobEndEvent(job, result)
+ case SparkListenerTaskStart(task, taskInfo) =>
+ processTaskStartEvent(task, taskInfo)
case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
processTaskEndEvent(task, reason, taskInfo, taskMetrics)
case _ =>
@@ -252,7 +254,19 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageInfo.stage.id + " STATUS=COMPLETED")
}
-
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ eventQueue.put(taskStart)
+ }
+
+ protected def processTaskStartEvent(task: Task[_], taskInfo: TaskInfo) {
+ var taskStatus = ""
+ task match {
+ case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
+ case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
+ }
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
eventQueue.put(taskEnd)
}
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 4fb1c5d42d..4eb7e4e6a5 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -29,6 +29,8 @@ case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends Spar
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
+case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
@@ -48,7 +50,12 @@ trait SparkListener {
* Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
-
+
+ /**
+ * Called when a task starts
+ */
+ def onTaskStart(taskEnd: SparkListenerTaskStart) { }
+
/**
* Called when a task ends
*/
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index 245e7ccb52..2cdeb1c8c0 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -27,6 +27,9 @@ import spark.executor.TaskMetrics
* Interface for getting events back from the TaskScheduler.
*/
private[spark] trait TaskSchedulerListener {
+ // A task has started.
+ def taskStarted(task: Task[_], taskInfo: TaskInfo)
+
// A task has finished or failed.
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 3d06520675..14e87af653 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -496,6 +496,8 @@ private[spark] class ClusterTaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
+ if (taskAttempts(index).size == 1)
+ taskStarted(task,info)
return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
@@ -518,6 +520,10 @@ private[spark] class ClusterTaskSetManager(
}
}
+ def taskStarted(task: Task[_], info: TaskInfo) {
+ sched.listener.taskStarted(task, info)
+ }
+
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
if (info.failed) {
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index e662ad6709..b500451990 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -117,6 +117,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val taskName = "task %s:%d".format(taskSet.id, index)
copiesRunning(index) += 1
increaseRunningTasks(1)
+ taskStarted(task, info)
return Some(new TaskDescription(taskId, null, taskName, bytes))
case None => {}
}
@@ -146,6 +147,10 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
}
+ def taskStarted(task: Task[_], info: TaskInfo) {
+ sched.listener.taskStarted(task, info)
+ }
+
def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
val index = info.index
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 20ea54d6a6..69fb306074 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -5,7 +5,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Properties
import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
@@ -18,7 +18,6 @@ import spark.ui.JettyUtils._
import spark.ui.Page.Executors
import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
-import spark.Utils
import scala.xml.{Node, XML}
@@ -45,7 +44,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
.reduceOption(_+_).getOrElse(0L)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Failed tasks", "Complete tasks", "Total tasks")
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
def execRow(kv: Seq[String]) =
<tr>
<td>{kv(0)}</td>
@@ -60,6 +59,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(6)}</td>
<td>{kv(7)}</td>
<td>{kv(8)}</td>
+ <td>{kv(9)}</td>
</tr>
val execInfo =
for (b <- 0 until storageStatusList.size)
@@ -93,6 +93,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
+ val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, Seq[Long]()).size.toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
@@ -104,6 +105,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
memUsed,
maxMem,
diskUsed,
+ activeTasks,
failedTasks,
completedTasks,
totalTasks
@@ -111,13 +113,24 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
+ val executorToTasksActive = HashMap[String, HashSet[Long]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ val eid = taskStart.taskInfo.executorId
+ if (!executorToTasksActive.contains(eid))
+ executorToTasksActive(eid) = HashSet[Long]()
+ executorToTasksActive(eid) += taskStart.taskInfo.taskId
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
+ if (!executorToTasksActive.contains(eid))
+ executorToTasksActive(eid) = HashSet[Long]()
+ executorToTasksActive(eid) -= taskStart.taskInfo.taskId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>