aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2015-09-24 23:39:04 -0400
committerMatei Zaharia <matei@databricks.com>2015-09-24 23:39:04 -0400
commit21fd12cb17b9e08a0cc49b4fda801af947a4183b (patch)
treea5aba0d02a65520878e38a93d7be2bb0731f0d05 /sql
parent8023242e77e4a799de6edc490078285684549b6d (diff)
downloadspark-21fd12cb17b9e08a0cc49b4fda801af947a4183b.tar.gz
spark-21fd12cb17b9e08a0cc49b4fda801af947a4183b.tar.bz2
spark-21fd12cb17b9e08a0cc49b4fda801af947a4183b.zip
[SPARK-9852] Let reduce tasks fetch multiple map output partitions
This makes two changes: - Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher - Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions. Author: Matei Zaharia <matei@databricks.com> Closes #8844 from mateiz/spark-9852.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala6
1 files changed, 6 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index 88f5b13c8f..743c99a899 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -65,6 +65,12 @@ class ShuffledRowRDD(
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRowRDDPartition(i))
}
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+ tracker.getPreferredLocationsForShuffle(dep, partition.index)
+ }
+
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[Int, InternalRow, InternalRow]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)