aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2015-08-16 00:34:58 -0700
committerMatei Zaharia <matei@databricks.com>2015-08-16 00:34:58 -0700
commitcf016075a006034c24c5b758edb279f3e151d25d (patch)
tree2bb6f57a1ad39c3633fc40c35909edc2b191cab5 /core
parent5f9ce738fe6bab3f0caffad0df1d3876178cf469 (diff)
downloadspark-cf016075a006034c24c5b758edb279f3e151d25d.tar.gz
spark-cf016075a006034c24c5b758edb279f3e151d25d.tar.bz2
spark-cf016075a006034c24c5b758edb279f3e151d25d.zip
[SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps
The shuffle locality patch made the DAGScheduler aware of shuffle data, but for RDDs that have both narrow and shuffle dependencies, it can cause them to place tasks based on the shuffle dependency instead of the narrow one. This case is common in iterative join-based algorithms like PageRank and ALS, where one RDD is hash-partitioned and one isn't. Author: Matei Zaharia <matei@databricks.com> Closes #8220 from mateiz/shuffle-loc-fix.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala37
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala26
2 files changed, 44 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f1c63d0876..dadf83a382 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1383,33 +1383,36 @@ class DAGScheduler(
return rddPrefs.map(TaskLocation(_))
}
+ // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
+ // that has any placement preferences. Ideally we would choose based on transfer sizes,
+ // but this will do for now.
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
- // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
- // that has any placement preferences. Ideally we would choose based on transfer sizes,
- // but this will do for now.
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
- case s: ShuffleDependency[_, _, _] =>
- // For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
- // of data as preferred locations
- if (shuffleLocalityEnabled &&
- rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
- s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
- // Get the preferred map output locations for this reducer
- val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
- partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
- if (topLocsForReducer.nonEmpty) {
- return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
- }
- }
-
case _ =>
}
+
+ // If the RDD has shuffle dependencies and shuffle locality is enabled, pick locations that
+ // have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
+ if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) {
+ rdd.dependencies.foreach {
+ case s: ShuffleDependency[_, _, _] =>
+ if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {
+ // Get the preferred map output locations for this reducer
+ val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
+ partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)
+ if (topLocsForReducer.nonEmpty) {
+ return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
+ }
+ }
+ case _ =>
+ }
+ }
Nil
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b0ca49cbea..a063596d3e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -926,7 +926,7 @@ class DAGSchedulerSuite
assertLocations(reduceTaskSet, Seq(Seq("hostA")))
complete(reduceTaskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("reduce task locality preferences should only include machines with largest map outputs") {
@@ -950,7 +950,29 @@ class DAGSchedulerSuite
assertLocations(reduceTaskSet, Seq(hosts))
complete(reduceTaskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
+ }
+
+ test("stages with both narrow and shuffle dependencies use narrow ones for locality") {
+ // Create an RDD that has both a shuffle dependency and a narrow dependency (e.g. for a join)
+ val rdd1 = new MyRDD(sc, 1, Nil)
+ val rdd2 = new MyRDD(sc, 1, Nil, locations = Seq(Seq("hostB")))
+ val shuffleDep = new ShuffleDependency(rdd1, null)
+ val narrowDep = new OneToOneDependency(rdd2)
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep, narrowDep))
+ submit(reduceRdd, Array(0))
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", 1))))
+ assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostA")))
+
+ // Reducer should run where RDD 2 has preferences, even though though it also has a shuffle dep
+ val reduceTaskSet = taskSets(1)
+ assertLocations(reduceTaskSet, Seq(Seq("hostB")))
+ complete(reduceTaskSet, Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty()
}
test("Spark exceptions should include call site in stack trace") {