aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2015-09-24 23:39:04 -0400
committerMatei Zaharia <matei@databricks.com>2015-09-24 23:39:04 -0400
commit21fd12cb17b9e08a0cc49b4fda801af947a4183b (patch)
treea5aba0d02a65520878e38a93d7be2bb0731f0d05
parent8023242e77e4a799de6edc490078285684549b6d (diff)
downloadspark-21fd12cb17b9e08a0cc49b4fda801af947a4183b.tar.gz
spark-21fd12cb17b9e08a0cc49b4fda801af947a4183b.tar.bz2
spark-21fd12cb17b9e08a0cc49b4fda801af947a4183b.zip
[SPARK-9852] Let reduce tasks fetch multiple map output partitions
This makes two changes: - Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher - Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions. Author: Matei Zaharia <matei@databricks.com> Closes #8844 from mateiz/spark-9852.
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala79
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala47
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala111
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala137
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala6
9 files changed, 306 insertions, 124 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 94eb8daa85..e4cb72e39d 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -134,11 +134,25 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
*/
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
- logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
+ getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
+ }
+
+ /**
+ * Called from executors to get the server URIs and output sizes for each shuffle block that
+ * needs to be read from a given range of map output partitions (startPartition is included but
+ * endPartition is excluded from the range).
+ *
+ * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
+ * and the second item is a sequence of (shuffle block id, shuffle block size) tuples
+ * describing the shuffle blocks that are stored at that block manager.
+ */
+ def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
+ : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
+ logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
- return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
+ return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
}
}
@@ -262,6 +276,21 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
/** Cache a serialized version of the output statuses for each shuffle to send them out faster */
private var cacheEpoch = epoch
+ /** Whether to compute locality preferences for reduce tasks */
+ private val shuffleLocalityEnabled = conf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
+
+ // Number of map and reduce tasks above which we do not assign preferred locations based on map
+ // output sizes. We limit the size of jobs for which assign preferred locations as computing the
+ // top locations by size becomes expensive.
+ private val SHUFFLE_PREF_MAP_THRESHOLD = 1000
+ // NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
+ private val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
+
+ // Fraction of total map output that must be at a location for it to considered as a preferred
+ // location for a reduce task. Making this larger will focus on fewer locations where most data
+ // can be read locally, but may lead to more delay in scheduling if those locations are busy.
+ private val REDUCER_PREF_LOCS_FRACTION = 0.2
+
/**
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
@@ -323,6 +352,30 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
}
/**
+ * Return the preferred hosts on which to run the given map output partition in a given shuffle,
+ * i.e. the nodes that the most outputs for that partition are on.
+ *
+ * @param dep shuffle dependency object
+ * @param partitionId map output partition that we want to read
+ * @return a sequence of host names
+ */
+ def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
+ : Seq[String] = {
+ if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
+ dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
+ val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
+ dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
+ if (blockManagerIds.nonEmpty) {
+ blockManagerIds.get.map(_.host)
+ } else {
+ Nil
+ }
+ } else {
+ Nil
+ }
+ }
+
+ /**
* Return a list of locations that each have fraction of map output greater than the specified
* threshold.
*
@@ -460,23 +513,25 @@ private[spark] object MapOutputTracker extends Logging {
}
/**
- * Converts an array of MapStatuses for a given reduce ID to a sequence that, for each block
- * manager ID, lists the shuffle block ids and corresponding shuffle block sizes stored at that
- * block manager.
+ * Given an array of map statuses and a range of map output partitions, returns a sequence that,
+ * for each block manager ID, lists the shuffle block IDs and corresponding shuffle block sizes
+ * stored at that block manager.
*
* If any of the statuses is null (indicating a missing location due to a failed mapper),
* throws a FetchFailedException.
*
* @param shuffleId Identifier for the shuffle
- * @param reduceId Identifier for the reduce task
+ * @param startPartition Start of map output partition ID range (included in range)
+ * @param endPartition End of map output partition ID range (excluded from range)
* @param statuses List of map statuses, indexed by map ID.
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
- * and the second item is a sequence of (shuffle block id, shuffle block size) tuples
+ * and the second item is a sequence of (shuffle block ID, shuffle block size) tuples
* describing the shuffle blocks that are stored at that block manager.
*/
private def convertMapStatuses(
shuffleId: Int,
- reduceId: Int,
+ startPartition: Int,
+ endPartition: Int,
statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
assert (statuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
@@ -484,10 +539,12 @@ private[spark] object MapOutputTracker extends Logging {
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
logError(errorMessage)
- throw new MetadataFetchFailedException(shuffleId, reduceId, errorMessage)
+ throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
} else {
- splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
- ((ShuffleBlockId(shuffleId, mapId, reduceId), status.getSizeForBlock(reduceId)))
+ for (part <- startPartition until endPartition) {
+ splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
+ ((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index cb15d912bb..a013c3f66a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -86,6 +86,12 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
+ tracker.getPreferredLocationsForShuffle(dep, partition.index)
+ }
+
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 394228b272..ade372be09 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -184,22 +184,6 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
- // Flag to control if reduce tasks are assigned preferred locations
- private val shuffleLocalityEnabled =
- sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
- // Number of map, reduce tasks above which we do not assign preferred locations
- // based on map output sizes. We limit the size of jobs for which assign preferred locations
- // as computing the top locations by size becomes expensive.
- private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
- // NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
- private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
-
- // Fraction of total map output that must be at a location for it to considered as a preferred
- // location for a reduce task.
- // Making this larger will focus on fewer locations where most data can be read locally, but
- // may lead to more delay in scheduling if those locations are busy.
- private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2
-
/**
* Called by the TaskSetManager to report task's starting.
*/
@@ -1570,25 +1554,10 @@ class DAGScheduler(
return locs
}
}
+
case _ =>
}
- // If the RDD has shuffle dependencies and shuffle locality is enabled, pick locations that
- // have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
- if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) {
- rdd.dependencies.foreach {
- case s: ShuffleDependency[_, _, _] =>
- if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {
- // Get the preferred map output locations for this reducer
- val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
- partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)
- if (topLocsForReducer.nonEmpty) {
- return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
- }
- }
- case _ =>
- }
- }
Nil
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index 6dc9a16e58..7c3e2b5a37 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -23,6 +23,10 @@ import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter
+/**
+ * Fetches and reads the partitions in range [startPartition, endPartition) from a shuffle by
+ * requesting them from other nodes' block stores.
+ */
private[spark] class BlockStoreShuffleReader[K, C](
handle: BaseShuffleHandle[K, _, C],
startPartition: Int,
@@ -32,9 +36,6 @@ private[spark] class BlockStoreShuffleReader[K, C](
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
extends ShuffleReader[K, C] with Logging {
- require(endPartition == startPartition + 1,
- "Hash shuffle currently only supports fetching one partition")
-
private val dep = handle.dependency
/** Read the combined key-values for this reduce task */
@@ -43,7 +44,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
context,
blockManager.shuffleClient,
blockManager,
- mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition),
+ mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
index 3fe28027c3..e0f474aa50 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
-import org.apache.spark.rdd.{ShuffledRDDPartition, RDD, ShuffledRDD}
import org.apache.spark._
object AdaptiveSchedulingSuiteState {
@@ -28,26 +27,10 @@ object AdaptiveSchedulingSuiteState {
}
}
-/** A special ShuffledRDD where we can pass a ShuffleDependency object to use */
-class CustomShuffledRDD[K, V, C](@transient dep: ShuffleDependency[K, V, C])
- extends RDD[(K, C)](dep.rdd.context, Seq(dep)) {
-
- override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
- val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
- SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
- .read()
- .asInstanceOf[Iterator[(K, C)]]
- }
-
- override def getPartitions: Array[Partition] = {
- Array.tabulate[Partition](dep.partitioner.numPartitions)(i => new ShuffledRDDPartition(i))
- }
-}
-
class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
test("simple use of submitMapStage") {
try {
- sc = new SparkContext("local[1,2]", "test")
+ sc = new SparkContext("local", "test")
val rdd = sc.parallelize(1 to 3, 3).map { x =>
AdaptiveSchedulingSuiteState.tasksRun += 1
(x, x)
@@ -62,4 +45,32 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
AdaptiveSchedulingSuiteState.clear()
}
}
+
+ test("fetching multiple map output partitions per reduce") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
+ val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3))
+ val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 2))
+ assert(shuffled.partitions.length === 2)
+ assert(shuffled.glom().map(_.toSet).collect().toSet == Set(Set((0, 0), (1, 1)), Set((2, 2))))
+ }
+
+ test("fetching all map output partitions in one reduce") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
+ // Also create lots of hash partitions so that some of them are empty
+ val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(5))
+ val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0))
+ assert(shuffled.partitions.length === 1)
+ assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2)))
+ }
+
+ test("more reduce tasks than map output partitions") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
+ val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3))
+ val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 0, 0, 1, 1, 1, 2))
+ assert(shuffled.partitions.length === 7)
+ assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2)))
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala
new file mode 100644
index 0000000000..d8d818ceed
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.Arrays
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+
+/**
+ * A Partitioner that might group together one or more partitions from the parent.
+ *
+ * @param parent a parent partitioner
+ * @param partitionStartIndices indices of partitions in parent that should create new partitions
+ * in child (this should be an array of increasing partition IDs). For example, if we have a
+ * parent with 5 partitions, and partitionStartIndices is [0, 2, 4], we get three output
+ * partitions, corresponding to partition ranges [0, 1], [2, 3] and [4] of the parent partitioner.
+ */
+class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: Array[Int])
+ extends Partitioner {
+
+ @transient private lazy val parentPartitionMapping: Array[Int] = {
+ val n = parent.numPartitions
+ val result = new Array[Int](n)
+ for (i <- 0 until partitionStartIndices.length) {
+ val start = partitionStartIndices(i)
+ val end = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n
+ for (j <- start until end) {
+ result(j) = i
+ }
+ }
+ result
+ }
+
+ override def numPartitions: Int = partitionStartIndices.size
+
+ override def getPartition(key: Any): Int = {
+ parentPartitionMapping(parent.getPartition(key))
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case c: CoalescedPartitioner =>
+ c.parent == parent && Arrays.equals(c.partitionStartIndices, partitionStartIndices)
+ case _ =>
+ false
+ }
+}
+
+private[spark] class CustomShuffledRDDPartition(
+ val index: Int, val startIndexInParent: Int, val endIndexInParent: Int)
+ extends Partition {
+
+ override def hashCode(): Int = index
+}
+
+/**
+ * A special ShuffledRDD that supports a ShuffleDependency object from outside and launching reduce
+ * tasks that read multiple map output partitions.
+ */
+class CustomShuffledRDD[K, V, C](
+ var dependency: ShuffleDependency[K, V, C],
+ partitionStartIndices: Array[Int])
+ extends RDD[(K, C)](dependency.rdd.context, Seq(dependency)) {
+
+ def this(dep: ShuffleDependency[K, V, C]) = {
+ this(dep, (0 until dep.partitioner.numPartitions).toArray)
+ }
+
+ override def getDependencies: Seq[Dependency[_]] = List(dependency)
+
+ override val partitioner = {
+ Some(new CoalescedPartitioner(dependency.partitioner, partitionStartIndices))
+ }
+
+ override def getPartitions: Array[Partition] = {
+ val n = dependency.partitioner.numPartitions
+ Array.tabulate[Partition](partitionStartIndices.length) { i =>
+ val startIndex = partitionStartIndices(i)
+ val endIndex = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n
+ new CustomShuffledRDDPartition(i, startIndex, endIndex)
+ }
+ }
+
+ override def compute(p: Partition, context: TaskContext): Iterator[(K, C)] = {
+ val part = p.asInstanceOf[CustomShuffledRDDPartition]
+ SparkEnv.get.shuffleManager.getReader(
+ dependency.shuffleHandle, part.startIndexInParent, part.endIndexInParent, context)
+ .read()
+ .asInstanceOf[Iterator[(K, C)]]
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ dependency = null
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 6b5bcf0574..697c195e4a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -49,19 +49,39 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
* An RDD for passing to DAGScheduler. These RDDs will use the dependencies and
* preferredLocations (if any) that are passed to them. They are deliberately not executable
* so we can test that DAGScheduler does not try to execute RDDs locally.
+ *
+ * Optionally, one can pass in a list of locations to use as preferred locations for each task,
+ * and a MapOutputTrackerMaster to enable reduce task locality. We pass the tracker separately
+ * because, in this test suite, it won't be the same as sc.env.mapOutputTracker.
*/
class MyRDD(
sc: SparkContext,
numPartitions: Int,
dependencies: List[Dependency[_]],
- locations: Seq[Seq[String]] = Nil) extends RDD[(Int, Int)](sc, dependencies) with Serializable {
+ locations: Seq[Seq[String]] = Nil,
+ @transient tracker: MapOutputTrackerMaster = null)
+ extends RDD[(Int, Int)](sc, dependencies) with Serializable {
+
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
+
override def getPartitions: Array[Partition] = (0 until numPartitions).map(i => new Partition {
override def index: Int = i
}).toArray
- override def getPreferredLocations(split: Partition): Seq[String] =
- if (locations.isDefinedAt(split.index)) locations(split.index) else Nil
+
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ if (locations.isDefinedAt(partition.index)) {
+ locations(partition.index)
+ } else if (tracker != null && dependencies.size == 1 &&
+ dependencies(0).isInstanceOf[ShuffleDependency[_, _, _]]) {
+ // If we have only one shuffle dependency, use the same code path as ShuffledRDD for locality
+ val dep = dependencies(0).asInstanceOf[ShuffleDependency[_, _, _]]
+ tracker.getPreferredLocationsForShuffle(dep, partition.index)
+ } else {
+ Nil
+ }
+ }
+
override def toString: String = "DAGSchedulerSuiteRDD " + id
}
@@ -351,7 +371,8 @@ class DAGSchedulerSuite
*/
test("getMissingParentStages should consider all ancestor RDDs' cache statuses") {
val rddA = new MyRDD(sc, 1, Nil)
- val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, null)))
+ val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, new HashPartitioner(1))),
+ tracker = mapOutputTracker)
val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))).cache()
val rddD = new MyRDD(sc, 1, List(new OneToOneDependency(rddC)))
cacheLocations(rddC.id -> 0) =
@@ -458,9 +479,9 @@ class DAGSchedulerSuite
test("run trivial shuffle") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
@@ -474,9 +495,9 @@ class DAGSchedulerSuite
test("run trivial shuffle with fetch failure") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
@@ -590,9 +611,8 @@ class DAGSchedulerSuite
val parts = 8
val shuffleMapRdd = new MyRDD(sc, parts, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
- val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, parts, List(shuffleDep))
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, (0 until parts).toArray)
completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts)
@@ -625,9 +645,8 @@ class DAGSchedulerSuite
setupStageAbortTest(sc)
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
- val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))
for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
@@ -668,10 +687,10 @@ class DAGSchedulerSuite
setupStageAbortTest(sc)
val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
- val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
- val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
- val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
- val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+ val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2))
+ val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker).cache()
+ val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1))
+ val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker)
submit(finalRdd, Array(0))
// In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations,
@@ -717,10 +736,10 @@ class DAGSchedulerSuite
setupStageAbortTest(sc)
val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
- val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
- val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
- val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
- val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+ val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2))
+ val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker).cache()
+ val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1))
+ val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker)
submit(finalRdd, Array(0))
// First, execute stages 0 and 1, failing stage 1 up to MAX-1 times.
@@ -777,9 +796,9 @@ class DAGSchedulerSuite
test("trivial shuffle with multiple fetch failures") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
@@ -818,9 +837,9 @@ class DAGSchedulerSuite
*/
test("late fetch failures don't cause multiple concurrent attempts for the same map stage") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))
val mapStageId = 0
@@ -886,9 +905,9 @@ class DAGSchedulerSuite
test("extremely late fetch failures don't cause multiple concurrent attempts for " +
"the same stage") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))
def countSubmittedReduceStageAttempts(): Int = {
@@ -949,9 +968,9 @@ class DAGSchedulerSuite
test("ignore late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))
// pretend we were told hostA went away
@@ -1018,8 +1037,8 @@ class DAGSchedulerSuite
test("run shuffle with map stage failure") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
- val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))
// Fail the map stage. This should cause the entire job to fail.
@@ -1221,12 +1240,12 @@ class DAGSchedulerSuite
*/
test("failure of stage used by two jobs") {
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
- val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, null)
+ val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
val shuffleMapRdd2 = new MyRDD(sc, 2, Nil)
- val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, null)
+ val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(2))
- val reduceRdd1 = new MyRDD(sc, 2, List(shuffleDep1))
- val reduceRdd2 = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2))
+ val reduceRdd1 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
+ val reduceRdd2 = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2), tracker = mapOutputTracker)
// We need to make our own listeners for this test, since by default submit uses the same
// listener for all jobs, and here we want to capture the failure for each job separately.
@@ -1258,9 +1277,9 @@ class DAGSchedulerSuite
test("run trivial shuffle with out-of-band failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
// blockManagerMaster.removeExecutor("exec-hostA")
// pretend we were told hostA went away
@@ -1281,10 +1300,10 @@ class DAGSchedulerSuite
test("recursive shuffle failures") {
val shuffleOneRdd = new MyRDD(sc, 2, Nil)
- val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
- val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne))
- val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
- val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+ val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2))
+ val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker)
+ val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1))
+ val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker)
submit(finalRdd, Array(0))
// have the first stage complete normally
complete(taskSets(0), Seq(
@@ -1310,10 +1329,10 @@ class DAGSchedulerSuite
test("cached post-shuffle") {
val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
- val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
- val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
- val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
- val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+ val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2))
+ val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker).cache()
+ val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1))
+ val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker)
submit(finalRdd, Array(0))
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
@@ -1419,9 +1438,9 @@ class DAGSchedulerSuite
test("reduce tasks should be placed locally with map output") {
// Create an shuffleMapRdd with 1 partition
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1))))
@@ -1440,9 +1459,9 @@ class DAGSchedulerSuite
val numMapTasks = 4
// Create an shuffleMapRdd with more partitions
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
- val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
val statuses = (1 to numMapTasks).map { i =>
@@ -1464,10 +1483,10 @@ class DAGSchedulerSuite
// Create an RDD that has both a shuffle dependency and a narrow dependency (e.g. for a join)
val rdd1 = new MyRDD(sc, 1, Nil)
val rdd2 = new MyRDD(sc, 1, Nil, locations = Seq(Seq("hostB")))
- val shuffleDep = new ShuffleDependency(rdd1, null)
+ val shuffleDep = new ShuffleDependency(rdd1, new HashPartitioner(1))
val narrowDep = new OneToOneDependency(rdd2)
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 1, List(shuffleDep, narrowDep))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep, narrowDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1))))
@@ -1500,7 +1519,8 @@ class DAGSchedulerSuite
test("simple map stage submission") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
- val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
// Submit a map stage by itself
submitMapStage(shuffleDep)
@@ -1526,7 +1546,8 @@ class DAGSchedulerSuite
test("map stage submission with reduce stage also depending on the data") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
- val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
// Submit the map stage by itself
submitMapStage(shuffleDep)
@@ -1555,7 +1576,7 @@ class DAGSchedulerSuite
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
// Submit a map stage by itself
submitMapStage(shuffleDep)
@@ -1604,9 +1625,9 @@ class DAGSchedulerSuite
test("map stage submission with multiple shared stages and failures") {
val rdd1 = new MyRDD(sc, 2, Nil)
val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
- val rdd2 = new MyRDD(sc, 2, List(dep1))
+ val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
- val rdd3 = new MyRDD(sc, 2, List(dep2))
+ val rdd3 = new MyRDD(sc, 2, List(dep2), tracker = mapOutputTracker)
val listener1 = new SimpleListener
val listener2 = new SimpleListener
@@ -1712,7 +1733,7 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
// Also test that a reduce stage using this shuffled data can immediately run
- val reduceRDD = new MyRDD(sc, 2, List(shuffleDep))
+ val reduceRDD = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
results.clear()
submit(reduceRDD, Array(0, 1))
complete(taskSets(2), Seq((Success, 42), (Success, 43)))
diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
index a5eafb1b55..26a372d6a9 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
@@ -114,7 +114,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
// Make a mocked MapOutputTracker for the shuffle reader to use to determine what
// shuffle data to read.
val mapOutputTracker = mock(classOf[MapOutputTracker])
- when(mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId)).thenReturn {
+ when(mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)).thenReturn {
// Test a scenario where all data is local, to avoid creating a bunch of additional mocks
// for the code to read data over the network.
val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index 88f5b13c8f..743c99a899 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -65,6 +65,12 @@ class ShuffledRowRDD(
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRowRDDPartition(i))
}
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+ tracker.getPreferredLocationsForShuffle(dep, partition.index)
+ }
+
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[Int, InternalRow, InternalRow]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)