aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2015-08-16 00:34:58 -0700
committerMatei Zaharia <matei@databricks.com>2015-08-16 00:34:58 -0700
commitcf016075a006034c24c5b758edb279f3e151d25d (patch)
tree2bb6f57a1ad39c3633fc40c35909edc2b191cab5 /core/src/main
parent5f9ce738fe6bab3f0caffad0df1d3876178cf469 (diff)
downloadspark-cf016075a006034c24c5b758edb279f3e151d25d.tar.gz
spark-cf016075a006034c24c5b758edb279f3e151d25d.tar.bz2
spark-cf016075a006034c24c5b758edb279f3e151d25d.zip
[SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps
The shuffle locality patch made the DAGScheduler aware of shuffle data, but for RDDs that have both narrow and shuffle dependencies, it can cause them to place tasks based on the shuffle dependency instead of the narrow one. This case is common in iterative join-based algorithms like PageRank and ALS, where one RDD is hash-partitioned and one isn't. Author: Matei Zaharia <matei@databricks.com> Closes #8220 from mateiz/shuffle-loc-fix.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala37
1 files changed, 20 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f1c63d0876..dadf83a382 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1383,33 +1383,36 @@ class DAGScheduler(
return rddPrefs.map(TaskLocation(_))
}
+ // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
+ // that has any placement preferences. Ideally we would choose based on transfer sizes,
+ // but this will do for now.
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
- // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
- // that has any placement preferences. Ideally we would choose based on transfer sizes,
- // but this will do for now.
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
- case s: ShuffleDependency[_, _, _] =>
- // For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
- // of data as preferred locations
- if (shuffleLocalityEnabled &&
- rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
- s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
- // Get the preferred map output locations for this reducer
- val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
- partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
- if (topLocsForReducer.nonEmpty) {
- return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
- }
- }
-
case _ =>
}
+
+ // If the RDD has shuffle dependencies and shuffle locality is enabled, pick locations that
+ // have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
+ if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) {
+ rdd.dependencies.foreach {
+ case s: ShuffleDependency[_, _, _] =>
+ if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {
+ // Get the preferred map output locations for this reducer
+ val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
+ partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)
+ if (topLocsForReducer.nonEmpty) {
+ return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
+ }
+ }
+ case _ =>
+ }
+ }
Nil
}