aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-28 10:17:35 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-28 10:45:57 -0800
commit501433f1d59b1b326c0a7169fa1fd6136f7628e3 (patch)
tree70e8914ddd4df7fa9388b470a90838d2c05bda55
parentc423be7d8e1349fc00431328b76b52f4eee8a975 (diff)
downloadspark-501433f1d59b1b326c0a7169fa1fd6136f7628e3.tar.gz
spark-501433f1d59b1b326c0a7169fa1fd6136f7628e3.tar.bz2
spark-501433f1d59b1b326c0a7169fa1fd6136f7628e3.zip
Making submission time a field
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala3
2 files changed, 6 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index bce7418e87..7ba1f3430a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -86,7 +86,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]
- val stageSubmissionTimes = new HashMap[Stage, Long]
val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
@@ -394,8 +393,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
logDebug("New pending tasks: " + myPending)
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
- if (!stageSubmissionTimes.contains(stage)) {
- stageSubmissionTimes.put(stage, System.currentTimeMillis())
+ if (!stage.submissionTime.isDefined) {
+ stage.submissionTime = Some(System.currentTimeMillis())
}
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
@@ -413,7 +412,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val stage = idToStage(task.stageId)
def markStageAsFinished(stage: Stage) = {
- val serviceTime = stageSubmissionTimes.remove(stage) match {
+ val serviceTime = stage.submissionTime match {
case Some(t) => (System.currentTimeMillis() - t).toString
case _ => "Unkown"
}
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index e9419728e3..374114d870 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -32,6 +32,9 @@ private[spark] class Stage(
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
var numAvailableOutputs = 0
+ /** When first task was submitted to scheduler. */
+ var submissionTime: Option[Long] = None
+
private var nextAttemptId = 0
def isAvailable: Boolean = {