diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-28 22:01:52 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-28 22:01:52 -0800 |
commit | f6eb1f0825155bdcfc11d4c906407db0960eb76b (patch) | |
tree | ae14faaaa1e04c9027ca958d150e06fd7d2ba6f8 /core/src | |
parent | dda2ce017c5de11b2f8fa840dd1f491b2e728b1d (diff) | |
parent | 7ee824e42ebaa1fc0b0248e0a35021108625ed14 (diff) | |
download | spark-f6eb1f0825155bdcfc11d4c906407db0960eb76b.tar.gz spark-f6eb1f0825155bdcfc11d4c906407db0960eb76b.tar.bz2 spark-f6eb1f0825155bdcfc11d4c906407db0960eb76b.zip |
Merge pull request #413 from pwendell/stage-logging
SPARK-658: Adding logging of stage duration
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 20 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/Stage.scala | 3 |
2 files changed, 19 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index f10d7cc84e..b130be6a38 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -393,6 +393,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with logDebug("New pending tasks: " + myPending) taskSched.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority)) + if (!stage.submissionTime.isDefined) { + stage.submissionTime = Some(System.currentTimeMillis()) + } } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) @@ -407,6 +410,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stage = idToStage(task.stageId) + + def markStageAsFinished(stage: Stage) = { + val serviceTime = stage.submissionTime match { + case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) + case _ => "Unkown" + } + logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime)) + running -= stage + } event.reason match { case Success => logInfo("Completed " + task) @@ -421,13 +433,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 - job.listener.taskSucceeded(rt.outputId, event.result) // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { activeJobs -= job resultStageToJob -= stage - running -= stage + markStageAsFinished(stage) } + job.listener.taskSucceeded(rt.outputId, event.result) } case None => logInfo("Ignoring result from " + rt + " because its job has finished") @@ -444,8 +456,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with stage.addOutputLoc(smt.partition, status) } if (running.contains(stage) && pendingTasks(stage).isEmpty) { - logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages") - running -= stage + markStageAsFinished(stage) + logInfo("looking for newly runnable stages") logInfo("running: " + running) logInfo("waiting: " + waiting) logInfo("failed: " + failed) diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index e9419728e3..374114d870 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -32,6 +32,9 @@ private[spark] class Stage( val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 + /** When first task was submitted to scheduler. */ + var submissionTime: Option[Long] = None + private var nextAttemptId = 0 def isAvailable: Boolean = { |