aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-01-31 18:27:25 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-01-31 18:27:25 -0600
commit782187c21047ee31728bdb173a2b7ee708cef77b (patch)
treec3d09c8f0ca051328b55417c4910adf267f8cab1 /core
parent55327a283e962652a126d3f8ac7e9a19c76f1f19 (diff)
downloadspark-782187c21047ee31728bdb173a2b7ee708cef77b.tar.gz
spark-782187c21047ee31728bdb173a2b7ee708cef77b.tar.bz2
spark-782187c21047ee31728bdb173a2b7ee708cef77b.zip
Once we find a split with no block, we don't have to look for more.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala23
1 files changed, 11 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index b130be6a38..b62b25f688 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -177,18 +177,17 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (!visited(rdd)) {
visited += rdd
val locs = getCacheLocs(rdd)
- for (p <- 0 until rdd.splits.size) {
- if (locs(p) == Nil) {
- for (dep <- rdd.dependencies) {
- dep match {
- case shufDep: ShuffleDependency[_,_] =>
- val mapStage = getShuffleMapStage(shufDep, stage.priority)
- if (!mapStage.isAvailable) {
- missing += mapStage
- }
- case narrowDep: NarrowDependency[_] =>
- visit(narrowDep.rdd)
- }
+ val atLeastOneMissing = (0 until rdd.splits.size).exists(locs(_) == Nil)
+ if (atLeastOneMissing) {
+ for (dep <- rdd.dependencies) {
+ dep match {
+ case shufDep: ShuffleDependency[_,_] =>
+ val mapStage = getShuffleMapStage(shufDep, stage.priority)
+ if (!mapStage.isAvailable) {
+ missing += mapStage
+ }
+ case narrowDep: NarrowDependency[_] =>
+ visit(narrowDep.rdd)
}
}
}