aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-09-03 17:55:10 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2015-09-03 17:55:10 -0700
commitcf42138643d1d4bf464f1d700457309d9e537721 (patch)
tree3b3e6f981742a141e4847ad1c4061a47caede964 /core/src
parent208fbca102c269c52eaf84bdde9838474ded276b (diff)
downloadspark-cf42138643d1d4bf464f1d700457309d9e537721.tar.gz
spark-cf42138643d1d4bf464f1d700457309d9e537721.tar.bz2
spark-cf42138643d1d4bf464f1d700457309d9e537721.zip
[SPARK-10003] Improve readability of DAGScheduler
Note: this is not intended to be in Spark 1.5! This patch rewrites some code in the `DAGScheduler` to make it more readable. In particular - there were blocks of code that are unnecessary and removed for simplicity - there were abstractions that are unnecessary and made the code hard to navigate - other minor changes Author: Andrew Or <andrew@databricks.com> Closes #8217 from andrewor14/dag-scheduler-readability and squashes the following commits: 57abca3 [Andrew Or] Move comment back into if case 574fb1e [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-scheduler-readability 64a9ed2 [Andrew Or] Remove unnecessary code + minor code rewrites
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala46
1 files changed, 9 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d673cb0946..09e963f5cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -250,11 +250,12 @@ class DAGScheduler(
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
- registerShuffleDependencies(shuffleDep, firstJobId)
+ getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
+ shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
+ }
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
-
stage
}
}
@@ -365,16 +366,6 @@ class DAGScheduler(
parents.toList
}
- /** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
- private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
- val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
- while (parentsWithNoMapStage.nonEmpty) {
- val currentShufDep = parentsWithNoMapStage.pop()
- val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
- shuffleToMapStage(currentShufDep.shuffleId) = stage
- }
- }
-
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
@@ -391,11 +382,9 @@ class DAGScheduler(
if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
parents.push(shufDep)
}
-
- waitingForVisit.push(shufDep.rdd)
case _ =>
- waitingForVisit.push(dep.rdd)
}
+ waitingForVisit.push(dep.rdd)
}
}
}
@@ -1052,10 +1041,11 @@ class DAGScheduler(
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
- shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head),
+ shuffleStage.outputLocs.map(_.headOption.orNull),
changeEpoch = true)
clearCacheLocs()
+
if (shuffleStage.outputLocs.contains(Nil)) {
// Some tasks had failed; let's resubmit this shuffleStage
// TODO: Lower-level scheduler should also deal with this
@@ -1064,27 +1054,9 @@ class DAGScheduler(
shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
.map(_._2).mkString(", "))
submitStage(shuffleStage)
- } else {
- val newlyRunnable = new ArrayBuffer[Stage]
- for (shuffleStage <- waitingStages) {
- logInfo("Missing parents for " + shuffleStage + ": " +
- getMissingParentStages(shuffleStage))
- }
- for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
- {
- newlyRunnable += shuffleStage
- }
- waitingStages --= newlyRunnable
- runningStages ++= newlyRunnable
- for {
- shuffleStage <- newlyRunnable.sortBy(_.id)
- jobId <- activeJobForStage(shuffleStage)
- } {
- logInfo("Submitting " + shuffleStage + " (" +
- shuffleStage.rdd + "), which is now runnable")
- submitMissingTasks(shuffleStage, jobId)
- }
}
+
+ // Note: newly runnable stages will be submitted below when we submit waiting stages
}
}
@@ -1186,7 +1158,7 @@ class DAGScheduler(
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnExecutor(execId)
- val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head)
+ val locs = stage.outputLocs.map(_.headOption.orNull)
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
}
if (shuffleToMapStage.isEmpty) {