diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-28 10:17:35 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-28 10:45:57 -0800 |
commit | 501433f1d59b1b326c0a7169fa1fd6136f7628e3 (patch) | |
tree | 70e8914ddd4df7fa9388b470a90838d2c05bda55 /core | |
parent | c423be7d8e1349fc00431328b76b52f4eee8a975 (diff) | |
download | spark-501433f1d59b1b326c0a7169fa1fd6136f7628e3.tar.gz spark-501433f1d59b1b326c0a7169fa1fd6136f7628e3.tar.bz2 spark-501433f1d59b1b326c0a7169fa1fd6136f7628e3.zip |
Making submission time a field
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/Stage.scala | 3 |
2 files changed, 6 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index bce7418e87..7ba1f3430a 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -86,7 +86,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val activeJobs = new HashSet[ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob] - val stageSubmissionTimes = new HashMap[Stage, Long] val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) @@ -394,8 +393,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with logDebug("New pending tasks: " + myPending) taskSched.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority)) - if (!stageSubmissionTimes.contains(stage)) { - stageSubmissionTimes.put(stage, System.currentTimeMillis()) + if (!stage.submissionTime.isDefined) { + stage.submissionTime = Some(System.currentTimeMillis()) } } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( @@ -413,7 +412,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val stage = idToStage(task.stageId) def markStageAsFinished(stage: Stage) = { - val serviceTime = stageSubmissionTimes.remove(stage) match { + val serviceTime = stage.submissionTime match { case Some(t) => (System.currentTimeMillis() - t).toString case _ => "Unkown" } diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index e9419728e3..374114d870 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -32,6 +32,9 @@ private[spark] class Stage( val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 + /** When first task was submitted to scheduler. */ + var submissionTime: Option[Long] = None + private var nextAttemptId = 0 def isAvailable: Boolean = { |