diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2013-11-22 13:14:26 -0800 |
---|---|---|
committer | Mark Hamstra <markhamstra@gmail.com> | 2013-12-03 09:57:32 -0800 |
commit | 9ae2d094a967782e3f5a624dd854059a40430ee6 (patch) | |
tree | 6b0db84f7356257d0c5eb0c32b858d1fe8d11e91 /core/src/main/scala/org | |
parent | 27c45e523620d801d547f167a5a33d71ee3af7b5 (diff) | |
download | spark-9ae2d094a967782e3f5a624dd854059a40430ee6.tar.gz spark-9ae2d094a967782e3f5a624dd854059a40430ee6.tar.bz2 spark-9ae2d094a967782e3f5a624dd854059a40430ee6.zip |
Tightly couple stageIdToJobIds and jobIdToStageIds
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 29 |
1 files changed, 12 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index aeac14ad7b..01c5133e6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -259,7 +259,7 @@ class DAGScheduler( val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) stageIdToStage(id) = stage - registerJobIdWithStages(jobId, stage) + updateJobIdStageIdMaps(jobId, stage) stageToInfos(stage) = new StageInfo(stage) stage } @@ -348,30 +348,24 @@ class DAGScheduler( * Registers the given jobId among the jobs that need the given stage and * all of that stage's ancestors. */ - private def registerJobIdWithStages(jobId: Int, stage: Stage) { - def registerJobIdWithStageList(stages: List[Stage]) { + private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) { + def updateJobIdStageIdMapsList(stages: List[Stage]) { if (!stages.isEmpty) { val s = stages.head stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId + jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id val parents = getParentStages(s.rdd, jobId) val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) - registerJobIdWithStageList(parentsWithoutThisJobId ++ stages.tail) + updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail) } } - registerJobIdWithStageList(List(stage)) + updateJobIdStageIdMapsList(List(stage)) } - private def jobIdToStageIdsAdd(jobId: Int) { - val stageSet = jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) - stageIdToJobIds.foreach { case (stageId, jobSet) => - if (jobSet.contains(jobId)) { - stageSet += stageId - } - } - } - - // Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that - // were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation. + /** + * Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that + * were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation. + */ private def removeJobAndIndependentStages(jobId: Int): Set[Int] = { val registeredStages = jobIdToStageIds(jobId) val independentStages = new HashSet[Int]() @@ -555,7 +549,6 @@ class DAGScheduler( idToActiveJob(jobId) = job activeJobs += job resultStageToJob(finalStage) = job - jobIdToStageIdsAdd(jobId) listenerBus.post(SparkListenerJobStart(job, jobIdToStageIds(jobId).toArray, properties)) submitStage(finalStage) } @@ -605,9 +598,11 @@ class DAGScheduler( handleTaskCompletion(completion) case LocalJobCompleted(stage) => + val jobId = stageIdToJobIds(stage.id).head stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job, stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through stageToInfos -= stage // completion events or stage abort + jobIdToStageIds -= jobId case TaskSetFailed(taskSet, reason) => stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) } |