diff options
author | Matei Zaharia <matei@databricks.com> | 2015-09-24 23:39:04 -0400 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2015-09-24 23:39:04 -0400 |
commit | 21fd12cb17b9e08a0cc49b4fda801af947a4183b (patch) | |
tree | a5aba0d02a65520878e38a93d7be2bb0731f0d05 /sql | |
parent | 8023242e77e4a799de6edc490078285684549b6d (diff) | |
download | spark-21fd12cb17b9e08a0cc49b4fda801af947a4183b.tar.gz spark-21fd12cb17b9e08a0cc49b4fda801af947a4183b.tar.bz2 spark-21fd12cb17b9e08a0cc49b4fda801af947a4183b.zip |
[SPARK-9852] Let reduce tasks fetch multiple map output partitions
This makes two changes:
- Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher
- Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task
I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions.
Author: Matei Zaharia <matei@databricks.com>
Closes #8844 from mateiz/spark-9852.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala | 6 |
1 files changed, 6 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 88f5b13c8f..743c99a899 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -65,6 +65,12 @@ class ShuffledRowRDD( Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRowRDDPartition(i)) } + override def getPreferredLocations(partition: Partition): Seq[String] = { + val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] + tracker.getPreferredLocationsForShuffle(dep, partition.index) + } + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[Int, InternalRow, InternalRow]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) |