aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-06-10 15:03:40 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2015-06-10 15:04:38 -0700
commit96a7c888d806adfdb2c722025a1079ed7eaa2052 (patch)
tree95837a4607231ea5603fe947926ba2f67fa59a52
parent5014d0ed7e2f69810654003f8dd38078b945cf05 (diff)
downloadspark-96a7c888d806adfdb2c722025a1079ed7eaa2052.tar.gz
spark-96a7c888d806adfdb2c722025a1079ed7eaa2052.tar.bz2
spark-96a7c888d806adfdb2c722025a1079ed7eaa2052.zip
[SPARK-2774] Set preferred locations for reduce tasks
Set preferred locations for reduce tasks. The basic design is that we maintain a map from reducerId to a list of (sizes, locations) for each shuffle. We then set the preferred locations to be any machines that have 20% of more of the output that needs to be read by the reduce task. This will result in at most 5 preferred locations for each reduce task. Selecting the preferred locations involves O(# map tasks * # reduce tasks) computation, so we restrict this feature to cases where we have fewer than 1000 map tasks and 1000 reduce tasks. Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Closes #6652 from shivaram/reduce-locations and squashes the following commits: 492e25e [Shivaram Venkataraman] Remove unused import 2ef2d39 [Shivaram Venkataraman] Address code review comments 897a914 [Shivaram Venkataraman] Remove unused hash map f5be578 [Shivaram Venkataraman] Use fraction of map outputs to determine locations Also removes caching of preferred locations to make the API cleaner 68bc29e [Shivaram Venkataraman] Fix line length 1090b58 [Shivaram Venkataraman] Change flag name 77ce7d8 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations e5d56bd [Shivaram Venkataraman] Add flag to turn off locality for shuffle deps 6cfae98 [Shivaram Venkataraman] Filter out zero blocks, rename variables 9d5831a [Shivaram Venkataraman] Address some more comments 8e31266 [Shivaram Venkataraman] Fix style 0df3180 [Shivaram Venkataraman] Address code review comments e7d5449 [Shivaram Venkataraman] Fix merge issues ad7cb53 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations df14cee [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations 5093aea [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations 0171d3c [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations bc4dfd6 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations 774751b [Shivaram Venkataraman] Fix bug introduced by line length adjustment 34d0283 [Shivaram Venkataraman] Fix style issues 3b464b7 [Shivaram Venkataraman] Set preferred locations for reduce tasks This is another attempt at #1697 addressing some of the earlier concerns. This adds a couple of thresholds based on number map and reduce tasks beyond which we don't use preferred locations for reduce tasks.
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala37
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala76
4 files changed, 177 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 018422827e..862ffe868f 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -21,7 +21,7 @@ import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-import scala.collection.mutable.{HashSet, Map}
+import scala.collection.mutable.{HashMap, HashSet, Map}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -284,6 +284,53 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}
+ /**
+ * Return a list of locations that each have fraction of map output greater than the specified
+ * threshold.
+ *
+ * @param shuffleId id of the shuffle
+ * @param reducerId id of the reduce task
+ * @param numReducers total number of reducers in the shuffle
+ * @param fractionThreshold fraction of total map output size that a location must have
+ * for it to be considered large.
+ *
+ * This method is not thread-safe.
+ */
+ def getLocationsWithLargestOutputs(
+ shuffleId: Int,
+ reducerId: Int,
+ numReducers: Int,
+ fractionThreshold: Double)
+ : Option[Array[BlockManagerId]] = {
+
+ if (mapStatuses.contains(shuffleId)) {
+ val statuses = mapStatuses(shuffleId)
+ if (statuses.nonEmpty) {
+ // HashMap to add up sizes of all blocks at the same location
+ val locs = new HashMap[BlockManagerId, Long]
+ var totalOutputSize = 0L
+ var mapIdx = 0
+ while (mapIdx < statuses.length) {
+ val status = statuses(mapIdx)
+ val blockSize = status.getSizeForBlock(reducerId)
+ if (blockSize > 0) {
+ locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
+ totalOutputSize += blockSize
+ }
+ mapIdx = mapIdx + 1
+ }
+ val topLocs = locs.filter { case (loc, size) =>
+ size.toDouble / totalOutputSize >= fractionThreshold
+ }
+ // Return if we have any locations which satisfy the required threshold
+ if (topLocs.nonEmpty) {
+ return Some(topLocs.map(_._1).toArray)
+ }
+ }
+ }
+ None
+ }
+
def incrementEpoch() {
epochLock.synchronized {
epoch += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 75a567fb31..aea6674ed2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -137,6 +137,22 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
+ // Flag to control if reduce tasks are assigned preferred locations
+ private val shuffleLocalityEnabled =
+ sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
+ // Number of map, reduce tasks above which we do not assign preferred locations
+ // based on map output sizes. We limit the size of jobs for which assign preferred locations
+ // as computing the top locations by size becomes expensive.
+ private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
+ // NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
+ private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
+
+ // Fraction of total map output that must be at a location for it to considered as a preferred
+ // location for a reduce task.
+ // Making this larger will focus on fewer locations where most data can be read locally, but
+ // may lead to more delay in scheduling if those locations are busy.
+ private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2
+
// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
@@ -1384,17 +1400,32 @@ class DAGScheduler(
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
- // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
- // that has any placement preferences. Ideally we would choose based on transfer sizes,
- // but this will do for now.
+
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
+ // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
+ // that has any placement preferences. Ideally we would choose based on transfer sizes,
+ // but this will do for now.
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
+ case s: ShuffleDependency[_, _, _] =>
+ // For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
+ // of data as preferred locations
+ if (shuffleLocalityEnabled &&
+ rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
+ s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
+ // Get the preferred map output locations for this reducer
+ val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
+ partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
+ if (topLocsForReducer.nonEmpty) {
+ return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
+ }
+ }
+
case _ =>
}
Nil
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 1fab69678d..7a1961137c 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -205,4 +205,39 @@ class MapOutputTrackerSuite extends SparkFunSuite {
// masterTracker.stop() // this throws an exception
rpcEnv.shutdown()
}
+
+ test("getLocationsWithLargestOutputs with multiple outputs in same machine") {
+ val rpcEnv = createRpcEnv("test")
+ val tracker = new MapOutputTrackerMaster(conf)
+ tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+ new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
+ // Setup 3 map tasks
+ // on hostA with output size 2
+ // on hostA with output size 2
+ // on hostB with output size 3
+ tracker.registerShuffle(10, 3)
+ tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
+ Array(2L)))
+ tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
+ Array(2L)))
+ tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
+ Array(3L)))
+
+ // When the threshold is 50%, only host A should be returned as a preferred location
+ // as it has 4 out of 7 bytes of output.
+ val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5)
+ assert(topLocs50.nonEmpty)
+ assert(topLocs50.get.size === 1)
+ assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000))
+
+ // When the threshold is 20%, both hosts should be returned as preferred locations.
+ val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2)
+ assert(topLocs20.nonEmpty)
+ assert(topLocs20.get.size === 2)
+ assert(topLocs20.get.toSet ===
+ Seq(BlockManagerId("a", "hostA", 1000), BlockManagerId("b", "hostB", 1000)).toSet)
+
+ tracker.stop()
+ rpcEnv.shutdown()
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 47b2868753..833b600746 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -490,8 +490,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
+ (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
@@ -501,7 +501,7 @@ class DAGSchedulerSuite
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
- complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
// we can see both result blocks now
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))
@@ -517,8 +517,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
+ (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))
@@ -560,18 +560,18 @@ class DAGSchedulerSuite
assert(newEpoch > oldEpoch)
val taskSet = taskSets(0)
// should be ignored for being too old
- runEvent(CompletionEvent(
- taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
+ reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a non-failed host
- runEvent(CompletionEvent(
- taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
+ reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should be ignored for being too old
- runEvent(CompletionEvent(
- taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
+ reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
- runEvent(CompletionEvent(
- taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
+ reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -800,6 +800,50 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
+ test("reduce tasks should be placed locally with map output") {
+ // Create an shuffleMapRdd with 1 partition
+ val shuffleMapRdd = new MyRDD(sc, 1, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+ submit(reduceRdd, Array(0))
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", 1))))
+ assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
+ Array(makeBlockManagerId("hostA")))
+
+ // Reducer should run on the same host that map task ran
+ val reduceTaskSet = taskSets(1)
+ assertLocations(reduceTaskSet, Seq(Seq("hostA")))
+ complete(reduceTaskSet, Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
+ }
+
+ test("reduce task locality preferences should only include machines with largest map outputs") {
+ val numMapTasks = 4
+ // Create an shuffleMapRdd with more partitions
+ val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+ submit(reduceRdd, Array(0))
+
+ val statuses = (1 to numMapTasks).map { i =>
+ (Success, makeMapStatus("host" + i, 1, (10*i).toByte))
+ }
+ complete(taskSets(0), statuses)
+
+ // Reducer should prefer the last 3 hosts as they have 20%, 30% and 40% of data
+ val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)
+
+ val reduceTaskSet = taskSets(1)
+ assertLocations(reduceTaskSet, Seq(hosts))
+ complete(reduceTaskSet, Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
@@ -807,12 +851,12 @@ class DAGSchedulerSuite
private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
assert(hosts.size === taskSet.tasks.size)
for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
- assert(taskLocs.map(_.host) === expectedLocs)
+ assert(taskLocs.map(_.host).toSet === expectedLocs.toSet)
}
}
- private def makeMapStatus(host: String, reduces: Int): MapStatus =
- MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
+ private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
+ MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)