diff options
author | CrazyJvm <crazyjvm@gmail.com> | 2014-06-05 17:44:46 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-05 17:44:46 -0700 |
commit | 3d3f8c8004da110ca97973119e9d9f04f878ee81 (patch) | |
tree | 10afd83dd5f9deb4348369b52f0203840b816b2a | |
parent | c7a183b2c2bca13565496495b4ae3a3a9f63f9ab (diff) | |
download | spark-3d3f8c8004da110ca97973119e9d9f04f878ee81.tar.gz spark-3d3f8c8004da110ca97973119e9d9f04f878ee81.tar.bz2 spark-3d3f8c8004da110ca97973119e9d9f04f878ee81.zip |
Use pluggable clock in DAGSheduler #SPARK-2031
DAGScheduler supports pluggable clock like what TaskSetManager does.
Author: CrazyJvm <crazyjvm@gmail.com>
Closes #976 from CrazyJvm/clock and squashes the following commits:
6779a4c [CrazyJvm] Use pluggable clock in DAGSheduler
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ccff6a3d1a..e09a4221e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} -import org.apache.spark.util.Utils +import org.apache.spark.util.{SystemClock, Clock, Utils} /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of @@ -61,7 +61,8 @@ class DAGScheduler( listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, - env: SparkEnv) + env: SparkEnv, + clock: Clock = SystemClock) extends Logging { import DAGScheduler._ @@ -781,7 +782,7 @@ class DAGScheduler( logDebug("New pending tasks: " + myPending) taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) - stageToInfos(stage).submissionTime = Some(System.currentTimeMillis()) + stageToInfos(stage).submissionTime = Some(clock.getTime()) } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) @@ -807,11 +808,11 @@ class DAGScheduler( def markStageAsFinished(stage: Stage) = { val serviceTime = stageToInfos(stage).submissionTime match { - case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) + case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0) case _ => "Unknown" } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) - stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) + stageToInfos(stage).completionTime = Some(clock.getTime()) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) runningStages -= stage } @@ -1015,7 +1016,7 @@ class DAGScheduler( return } val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq - stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis()) + stageToInfos(failedStage).completionTime = Some(clock.getTime()) for (resultStage <- dependentStages) { val job = resultStageToJob(resultStage) failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason", |