From 916270f5f39efc8cd3baf833a1dcf70813b3a163 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 16 Oct 2013 16:00:36 -0700 Subject: Show "GETTING_RESULTS" state in UI. This commit adds a set of calls using the SparkListener interface that indicate when a task is remotely fetching results, so that we can display this (potentially time-consuming) phase of execution to users through the UI. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 8 +++ .../apache/spark/scheduler/DAGSchedulerEvent.scala | 3 + .../org/apache/spark/scheduler/SparkListener.scala | 9 +++ .../apache/spark/scheduler/SparkListenerBus.scala | 2 + .../org/apache/spark/scheduler/TaskInfo.scala | 20 +++++++ .../spark/scheduler/cluster/ClusterScheduler.scala | 4 ++ .../scheduler/cluster/ClusterTaskSetManager.scala | 6 ++ .../spark/scheduler/cluster/TaskResultGetter.scala | 1 + .../apache/spark/ui/jobs/JobProgressListener.scala | 8 ++- .../spark/scheduler/SparkListenerSuite.scala | 68 ++++++++++++++++++++-- 10 files changed, 124 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d84f5968df..9478360c68 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -67,6 +67,11 @@ class DAGScheduler( eventQueue.put(BeginEvent(task, taskInfo)) } + // Called to report that a task has completed and results are being fetched remotely. + def taskGettingResult(task: Task[_], taskInfo: TaskInfo) { + eventQueue.put(GettingResultEvent(task, taskInfo)) + } + // Called by TaskScheduler to report task completions or failures. def taskEnded( task: Task[_], @@ -412,6 +417,9 @@ class DAGScheduler( case begin: BeginEvent => listenerBus.post(SparkListenerTaskStart(begin.task, begin.taskInfo)) + case gettingResult: GettingResultEvent => + listenerBus.post(SparkListenerTaskGettingResult(gettingResult.task, gettingResult.taskInfo)) + case completion: CompletionEvent => listenerBus.post(SparkListenerTaskEnd( completion.task, completion.reason, completion.taskInfo, completion.taskMetrics)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index a5769c6041..708d221d60 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -53,6 +53,9 @@ private[scheduler] case object AllJobsCancelled extends DAGSchedulerEvent private[scheduler] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent +private[scheduler] +case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent + private[scheduler] case class CompletionEvent( task: Task[_], reason: TaskEndReason, diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 466baf9913..de24c022f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -31,6 +31,9 @@ case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents +case class SparkListenerTaskGettingResult( + task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents + case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo, taskMetrics: TaskMetrics) extends SparkListenerEvents @@ -56,6 +59,12 @@ trait SparkListener { */ def onTaskStart(taskStart: SparkListenerTaskStart) { } + /** + * Called when a task begins remotely fetching its result (will not be called for tasks that do + * not need to fetch the result remotely). + */ + def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { } + /** * Called when a task ends */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 4d3e4a17ba..d5824e7954 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -49,6 +49,8 @@ private[spark] class SparkListenerBus() extends Logging { sparkListeners.foreach(_.onJobEnd(jobEnd)) case taskStart: SparkListenerTaskStart => sparkListeners.foreach(_.onTaskStart(taskStart)) + case taskGettingResult: SparkListenerTaskGettingResult => + sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult)) case taskEnd: SparkListenerTaskEnd => sparkListeners.foreach(_.onTaskEnd(taskEnd)) case _ => diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 7c2a422aff..3be3f1a9fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -31,9 +31,25 @@ class TaskInfo( val host: String, val taskLocality: TaskLocality.TaskLocality) { + /** + * The time when the task started remotely getting the result. Will not be set if the + * task result was sent immediately when the task finished (as opposed to sending an + * IndirectTaskResult and later fetching the result from the block manager). + */ + var gettingResultTime: Long = 0 + + /** + * The time when the task has completed successfully (including the time to remotely fetch + * results, if necessary). + */ var finishTime: Long = 0 + var failed = false + def markGettingResult(time: Long = System.currentTimeMillis) { + gettingResultTime = time + } + def markSuccessful(time: Long = System.currentTimeMillis) { finishTime = time } @@ -43,6 +59,8 @@ class TaskInfo( failed = true } + def gettingResult: Boolean = gettingResultTime != 0 + def finished: Boolean = finishTime != 0 def successful: Boolean = finished && !failed @@ -52,6 +70,8 @@ class TaskInfo( def status: String = { if (running) "RUNNING" + else if (gettingResult) + "GETTING RESULT" else if (failed) "FAILED" else if (successful) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 4ea8bf8853..85033958ef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -306,6 +306,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } + def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) { + taskSetManager.handleTaskGettingResult(tid) + } + def handleSuccessfulTask( taskSetManager: ClusterTaskSetManager, tid: Long, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index 29093e3b4f..ee47aaffca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -418,6 +418,12 @@ private[spark] class ClusterTaskSetManager( sched.dagScheduler.taskStarted(task, info) } + def handleTaskGettingResult(tid: Long) = { + val info = taskInfos(tid) + info.markGettingResult() + sched.dagScheduler.taskGettingResult(tasks(info.index), info) + } + /** * Marks the task as successful and notifies the DAGScheduler that a task has ended. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala index 4312c46cc1..2064d97b49 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala @@ -50,6 +50,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche case directResult: DirectTaskResult[_] => directResult case IndirectTaskResult(blockId) => logDebug("Fetching indirect task result for TID %s".format(tid)) + scheduler.handleTaskGettingResult(taskSetManager, tid) val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) if (!serializedTaskResult.isDefined) { /* We won't be able to get the task result if the machine that ran the task failed diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index eb3b4e8522..913f7ec22f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -115,7 +115,13 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList taskList += ((taskStart.taskInfo, None, None)) stageToTaskInfos(sid) = taskList } - + + override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) + = synchronized { + // Do nothing: because we don't do a deep copy of the TaskInfo, the TaskInfo in + // stageToTaskInfos already has the updated status. + } + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val sid = taskEnd.task.stageId val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index a549417a47..b2fab254b2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -17,13 +17,16 @@ package org.apache.spark.scheduler +import scala.collection.mutable.{Buffer, HashSet} + import org.scalatest.FunSuite -import org.apache.spark.{SparkContext, LocalSparkContext} -import scala.collection.mutable import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.{SparkContext, LocalSparkContext} import org.apache.spark.SparkContext._ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + val WAIT_TIMEOUT_MILLIS = 10000 test("local metrics") { sc = new SparkContext("local[4]", "test") @@ -39,7 +42,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} d.count() - val WAIT_TIMEOUT_MILLIS = 10000 assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be (1) @@ -89,6 +91,47 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } } + test("onTaskGettingResult() called when result fetched remotely") { + // Need to use local cluster mode here, because results are not ever returned through the + // block manager when using the LocalScheduler. + sc = new SparkContext("local-cluster[1,1,512]", "test") + + val listener = new SaveTaskEvents + sc.addSparkListener(listener) + + // Make a task whose result is larger than the akka frame size + System.setProperty("spark.akka.frameSize", "1") + val akkaFrameSize = + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x) + assert(result === 1.to(akkaFrameSize).toArray) + + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + val TASK_INDEX = 0 + assert(listener.startedTasks.contains(TASK_INDEX)) + assert(listener.startedGettingResultTasks.contains(TASK_INDEX)) + assert(listener.endedTasks.contains(TASK_INDEX)) + } + + test("onTaskGettingResult() not called when result sent directly") { + // Need to use local cluster mode here, because results are not ever returned through the + // block manager when using the LocalScheduler. + sc = new SparkContext("local-cluster[1,1,512]", "test") + + val listener = new SaveTaskEvents + sc.addSparkListener(listener) + + // Make a task whose result is larger than the akka frame size + val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x) + assert(result === 2) + + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + val TASK_INDEX = 0 + assert(listener.startedTasks.contains(TASK_INDEX)) + assert(listener.startedGettingResultTasks.isEmpty == true) + assert(listener.endedTasks.contains(TASK_INDEX)) + } + def checkNonZeroAvg(m: Traversable[Long], msg: String) { assert(m.sum / m.size.toDouble > 0.0, msg) } @@ -99,10 +142,27 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } class SaveStageInfo extends SparkListener { - val stageInfos = mutable.Buffer[StageInfo]() + val stageInfos = Buffer[StageInfo]() override def onStageCompleted(stage: StageCompleted) { stageInfos += stage.stageInfo } } + class SaveTaskEvents extends SparkListener { + val startedTasks = new HashSet[Int]() + val startedGettingResultTasks = new HashSet[Int]() + val endedTasks = new HashSet[Int]() + + override def onTaskStart(taskStart: SparkListenerTaskStart) { + startedTasks += taskStart.taskInfo.index + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + endedTasks += taskEnd.taskInfo.index + } + + override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { + startedGettingResultTasks += taskGettingResult.taskInfo.index + } + } } -- cgit v1.2.3