aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-24 15:27:29 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-28 10:45:57 -0800
commit07f568e1bfc67eead88e2c5dbfb9cac23e1ac8bc (patch)
tree02683918bc9668ebf2bb2b1badfb5028c20ceb46
parentf03d9760fd8ac67fd0865cb355ba75d2eff507fe (diff)
downloadspark-07f568e1bfc67eead88e2c5dbfb9cac23e1ac8bc.tar.gz
spark-07f568e1bfc67eead88e2c5dbfb9cac23e1ac8bc.tar.bz2
spark-07f568e1bfc67eead88e2c5dbfb9cac23e1ac8bc.zip
SPARK-658: Adding logging of stage duration
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala21
1 files changed, 17 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index bd541d4207..8aad667182 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -86,6 +86,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]
+ val stageSubmissionTimes = new HashMap[Stage, Long]
val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
@@ -393,6 +394,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
logDebug("New pending tasks: " + myPending)
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
+ if (!stageSubmissionTimes.contains(stage)) {
+ stageSubmissionTimes.put(stage, System.currentTimeMillis())
+ }
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@@ -407,6 +411,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stage = idToStage(task.stageId)
+
+ def stageFinished(stage: Stage) = {
+ val serviceTime = stageSubmissionTimes.remove(stage) match {
+ case Some(t) => (System.currentTimeMillis() - t).toString
+ case _ => "Unkown"
+ }
+ logInfo("%s (%s) finished in %s ms".format(stage, stage.origin, serviceTime))
+ running -= stage
+ }
event.reason match {
case Success =>
logInfo("Completed " + task)
@@ -421,13 +434,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true
job.numFinished += 1
- job.listener.taskSucceeded(rt.outputId, event.result)
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
activeJobs -= job
resultStageToJob -= stage
- running -= stage
+ stageFinished(stage)
}
+ job.listener.taskSucceeded(rt.outputId, event.result)
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
@@ -444,8 +457,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
stage.addOutputLoc(smt.partition, status)
}
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
- logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages")
- running -= stage
+ stageFinished(stage)
+ logInfo("looking for newly runnable stages")
logInfo("running: " + running)
logInfo("waiting: " + waiting)
logInfo("failed: " + failed)