aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-11-22 13:14:26 -0800
committerMark Hamstra <markhamstra@gmail.com>2013-12-03 09:57:32 -0800
commit9ae2d094a967782e3f5a624dd854059a40430ee6 (patch)
tree6b0db84f7356257d0c5eb0c32b858d1fe8d11e91 /core
parent27c45e523620d801d547f167a5a33d71ee3af7b5 (diff)
downloadspark-9ae2d094a967782e3f5a624dd854059a40430ee6.tar.gz
spark-9ae2d094a967782e3f5a624dd854059a40430ee6.tar.bz2
spark-9ae2d094a967782e3f5a624dd854059a40430ee6.zip
Tightly couple stageIdToJobIds and jobIdToStageIds
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala29
1 files changed, 12 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index aeac14ad7b..01c5133e6e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -259,7 +259,7 @@ class DAGScheduler(
val stage =
new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
- registerJobIdWithStages(jobId, stage)
+ updateJobIdStageIdMaps(jobId, stage)
stageToInfos(stage) = new StageInfo(stage)
stage
}
@@ -348,30 +348,24 @@ class DAGScheduler(
* Registers the given jobId among the jobs that need the given stage and
* all of that stage's ancestors.
*/
- private def registerJobIdWithStages(jobId: Int, stage: Stage) {
- def registerJobIdWithStageList(stages: List[Stage]) {
+ private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) {
+ def updateJobIdStageIdMapsList(stages: List[Stage]) {
if (!stages.isEmpty) {
val s = stages.head
stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
+ jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
val parents = getParentStages(s.rdd, jobId)
val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
- registerJobIdWithStageList(parentsWithoutThisJobId ++ stages.tail)
+ updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
}
}
- registerJobIdWithStageList(List(stage))
+ updateJobIdStageIdMapsList(List(stage))
}
- private def jobIdToStageIdsAdd(jobId: Int) {
- val stageSet = jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]())
- stageIdToJobIds.foreach { case (stageId, jobSet) =>
- if (jobSet.contains(jobId)) {
- stageSet += stageId
- }
- }
- }
-
- // Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that
- // were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation.
+ /**
+ * Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that
+ * were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation.
+ */
private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
val registeredStages = jobIdToStageIds(jobId)
val independentStages = new HashSet[Int]()
@@ -555,7 +549,6 @@ class DAGScheduler(
idToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
- jobIdToStageIdsAdd(jobId)
listenerBus.post(SparkListenerJobStart(job, jobIdToStageIds(jobId).toArray, properties))
submitStage(finalStage)
}
@@ -605,9 +598,11 @@ class DAGScheduler(
handleTaskCompletion(completion)
case LocalJobCompleted(stage) =>
+ val jobId = stageIdToJobIds(stage.id).head
stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job,
stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through
stageToInfos -= stage // completion events or stage abort
+ jobIdToStageIds -= jobId
case TaskSetFailed(taskSet, reason) =>
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }