aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com>2016-05-03 13:43:20 -0700
committerDavies Liu <davies.liu@gmail.com>2016-05-03 13:43:20 -0700
commit83ee92f60345f016a390d61a82f1d924f64ddf90 (patch)
tree72c7760110fe63ca948366c0bccade9ae724754a
parenta4aed71719b4fc728de93afc623aef05d27bc89a (diff)
downloadspark-83ee92f60345f016a390d61a82f1d924f64ddf90.tar.gz
spark-83ee92f60345f016a390d61a82f1d924f64ddf90.tar.bz2
spark-83ee92f60345f016a390d61a82f1d924f64ddf90.zip
[SPARK-11316] coalesce doesn't handle UnionRDD with partial locality properly
## What changes were proposed in this pull request? coalesce doesn't handle UnionRDD with partial locality properly. I had a user who had a UnionRDD that was made up of mapPartitionRDD without preferred locations and a checkpointedRDD with preferred locations (getting from hdfs). It took the driver over 20 minutes to setup the groups and put the partitions into those groups before it even started any tasks. Even perhaps worse is it didn't end up with the number of partitions he was asking for because it didn't put a partition in each of the groups properly. The changes in this patch get rid of a n^2 while loop that was causing the 20 minutes, it properly distributes the partitions to have at least one per group, and it changes from using the rotation iterator which got the preferred locations many times to get all the preferred locations once up front. Note that the n^2 while loop that I removed in setupGroups took so long because all of the partitions with preferred locations were already assigned to group, so it basically looped through every single one and wasn't ever able to assign it. At the time I had 960 partitions with preferred locations and 1020 without and did the outer while loop 319 times because that is the # of groups left to create. Note that each of those times through the inner while loop is going off to hdfs to get the block locations, so this is extremely inefficient. ## How was the this patch tested? Added unit tests for this case and ran existing ones that applied to make sure no regressions. Also manually tested on the users production job to make sure it fixed their issue. It created the proper number of partitions and now it takes about 6 seconds rather then 20 minutes. I did also run some basic manual tests with spark-shell doing coalesced to smaller number, same number, and then greater with shuffle. Author: Thomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com> Closes #11327 from tgravescs/SPARK-11316.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala158
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala69
2 files changed, 165 insertions, 62 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index e75f1dbf81..c19ed1529b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -169,42 +169,37 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
var noLocality = true // if true if no preferredLocations exists for parent RDD
- // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
- def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
- prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
- }
-
- // this class just keeps iterating and rotating infinitely over the partitions of the RDD
- // next() returns the next preferred machine that a partition is replicated on
- // the rotator first goes through the first replica copy of each partition, then second, third
- // the iterators return type is a tuple: (replicaString, partition)
- class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
-
- var it: Iterator[(String, Partition)] = resetIterator()
-
- override val isEmpty = !it.hasNext
-
- // initializes/resets to start iterating from the beginning
- def resetIterator(): Iterator[(String, Partition)] = {
- val iterators = (0 to 2).map { x =>
- prev.partitions.iterator.flatMap { p =>
- if (currPrefLocs(p, prev).size > x) Some((currPrefLocs(p, prev)(x), p)) else None
+ class PartitionLocations(prev: RDD[_]) {
+
+ // contains all the partitions from the previous RDD that don't have preferred locations
+ val partsWithoutLocs = ArrayBuffer[Partition]()
+ // contains all the partitions from the previous RDD that have preferred locations
+ val partsWithLocs = ArrayBuffer[(String, Partition)]()
+
+ getAllPrefLocs(prev)
+
+ // gets all the preffered locations of the previous RDD and splits them into partitions
+ // with preferred locations and ones without
+ def getAllPrefLocs(prev: RDD[_]) {
+ val tmpPartsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]()
+ // first get the locations for each partition, only do this once since it can be expensive
+ prev.partitions.foreach(p => {
+ val locs = prev.context.getPreferredLocs(prev, p.index).map(tl => tl.host)
+ if (locs.size > 0) {
+ tmpPartsWithLocs.put(p, locs)
+ } else {
+ partsWithoutLocs += p
+ }
}
- }
- iterators.reduceLeft((x, y) => x ++ y)
- }
-
- // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
- override def hasNext: Boolean = { !isEmpty }
-
- // return the next preferredLocation of some partition of the RDD
- override def next(): (String, Partition) = {
- if (it.hasNext) {
- it.next()
- } else {
- it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
- it.next()
- }
+ )
+ // convert it into an array of host to partition
+ (0 to 2).map(x =>
+ tmpPartsWithLocs.foreach(parts => {
+ val p = parts._1
+ val locs = parts._2
+ if (locs.size > x) partsWithLocs += ((locs(x), p))
+ } )
+ )
}
}
@@ -228,33 +223,32 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
}
/**
- * Initializes targetLen partition groups and assigns a preferredLocation
- * This uses coupon collector to estimate how many preferredLocations it must rotate through
- * until it has seen most of the preferred locations (2 * n log(n))
+ * Initializes targetLen partition groups. If there are preferred locations, each group
+ * is assigned a preferredLocation. This uses coupon collector to estimate how many
+ * preferredLocations it must rotate through until it has seen most of the preferred
+ * locations (2 * n log(n))
* @param targetLen
*/
- def setupGroups(targetLen: Int, prev: RDD[_]) {
- val rotIt = new LocationIterator(prev)
-
+ def setupGroups(targetLen: Int, partitionLocs: PartitionLocations) {
// deal with empty case, just create targetLen partition groups with no preferred location
- if (!rotIt.hasNext) {
+ if (partitionLocs.partsWithLocs.isEmpty) {
(1 to targetLen).foreach(x => groupArr += new PartitionGroup())
return
}
noLocality = false
-
// number of iterations needed to be certain that we've seen most preferred locations
val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
var numCreated = 0
var tries = 0
// rotate through until either targetLen unique/distinct preferred locations have been created
- // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
- // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
- while (numCreated < targetLen && tries < expectedCoupons2) {
+ // OR (we have went through either all partitions OR we've rotated expectedCoupons2 - in
+ // which case we have likely seen all preferred locations)
+ val numPartsToLookAt = math.min(expectedCoupons2, partitionLocs.partsWithLocs.length)
+ while (numCreated < targetLen && tries < numPartsToLookAt) {
+ val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
tries += 1
- val (nxt_replica, nxt_part) = rotIt.next()
if (!groupHash.contains(nxt_replica)) {
val pgroup = new PartitionGroup(Some(nxt_replica))
groupArr += pgroup
@@ -263,20 +257,18 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
numCreated += 1
}
}
-
- while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates
- var (nxt_replica, nxt_part) = rotIt.next()
+ tries = 0
+ // if we don't have enough partition groups, create duplicates
+ while (numCreated < targetLen) {
+ var (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
+ tries += 1
val pgroup = new PartitionGroup(Some(nxt_replica))
groupArr += pgroup
groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
- var tries = 0
- while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
- nxt_part = rotIt.next()._2
- tries += 1
- }
+ addPartToPGroup(nxt_part, pgroup)
numCreated += 1
+ if (tries >= partitionLocs.partsWithLocs.length) tries = 0
}
-
}
/**
@@ -289,10 +281,15 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
* imbalance in favor of locality
* @return partition group (bin to be put in)
*/
- def pickBin(p: Partition, prev: RDD[_], balanceSlack: Double): PartitionGroup = {
+ def pickBin(
+ p: Partition,
+ prev: RDD[_],
+ balanceSlack: Double,
+ partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
+ val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq
// least loaded pref locs
- val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare)
+ val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)
@@ -320,7 +317,10 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
}
}
- def throwBalls(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+ def throwBalls(
+ maxPartitions: Int,
+ prev: RDD[_],
+ balanceSlack: Double, partitionLocs: PartitionLocations) {
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
for ((p, i) <- prev.partitions.zipWithIndex) {
@@ -334,8 +334,39 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
}
}
} else {
+ // It is possible to have unionRDD where one rdd has preferred locations and another rdd
+ // that doesn't. To make sure we end up with the requested number of partitions,
+ // make sure to put a partition in every group.
+
+ // if we don't have a partition assigned to every group first try to fill them
+ // with the partitions with preferred locations
+ val partIter = partitionLocs.partsWithLocs.iterator
+ groupArr.filter(pg => pg.numPartitions == 0).foreach { pg =>
+ while (partIter.hasNext && pg.numPartitions == 0) {
+ var (nxt_replica, nxt_part) = partIter.next()
+ if (!initialHash.contains(nxt_part)) {
+ pg.partitions += nxt_part
+ initialHash += nxt_part
+ }
+ }
+ }
+
+ // if we didn't get one partitions per group from partitions with preferred locations
+ // use partitions without preferred locations
+ val partNoLocIter = partitionLocs.partsWithoutLocs.iterator
+ groupArr.filter(pg => pg.numPartitions == 0).foreach { pg =>
+ while (partNoLocIter.hasNext && pg.numPartitions == 0) {
+ var nxt_part = partNoLocIter.next()
+ if (!initialHash.contains(nxt_part)) {
+ pg.partitions += nxt_part
+ initialHash += nxt_part
+ }
+ }
+ }
+
+ // finally pick bin for the rest
for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
- pickBin(p, prev, balanceSlack).partitions += p
+ pickBin(p, prev, balanceSlack, partitionLocs).partitions += p
}
}
}
@@ -349,8 +380,11 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
* @return array of partition groups
*/
def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = {
- setupGroups(math.min(prev.partitions.length, maxPartitions), prev) // setup the groups (bins)
- throwBalls(maxPartitions, prev, balanceSlack) // assign partitions (balls) to each group (bins)
+ val partitionLocs = new PartitionLocations(prev)
+ // setup the groups (bins)
+ setupGroups(math.min(prev.partitions.length, maxPartitions), partitionLocs)
+ // assign partitions (balls) to each group (bins)
+ throwBalls(maxPartitions, prev, balanceSlack, partitionLocs)
getPartitions
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 8dc463d56d..a663dab772 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -377,6 +377,33 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
}
+ test("coalesced RDDs with partial locality") {
+ // Make an RDD that has some locality preferences and some without. This can happen
+ // with UnionRDD
+ val data = sc.makeRDD((1 to 9).map(i => {
+ if (i > 4) {
+ (i, (i to (i + 2)).map { j => "m" + (j % 6) })
+ } else {
+ (i, Vector())
+ }
+ }))
+ val coalesced1 = data.coalesce(3)
+ assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
+
+ val splits = coalesced1.glom().collect().map(_.toList).toList
+ assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length)
+
+ assert(splits.forall(_.length >= 1) === true, "Some partitions were empty")
+
+ // If we try to coalesce into more partitions than the original RDD, it should just
+ // keep the original number of partitions.
+ val coalesced4 = data.coalesce(20)
+ val listOfLists = coalesced4.glom().collect().map(_.toList).toList
+ val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
+ assert(sortedList === (1 to 9).
+ map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
+ }
+
test("coalesced RDDs with locality, large scale (10K partitions)") {
// large scale experiment
import collection.mutable
@@ -418,6 +445,48 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
+ test("coalesced RDDs with partial locality, large scale (10K partitions)") {
+ // large scale experiment
+ import collection.mutable
+ val halfpartitions = 5000
+ val partitions = 10000
+ val numMachines = 50
+ val machines = mutable.ListBuffer[String]()
+ (1 to numMachines).foreach(machines += "m" + _)
+ val rnd = scala.util.Random
+ for (seed <- 1 to 5) {
+ rnd.setSeed(seed)
+
+ val firstBlocks = (1 to halfpartitions).map { i =>
+ (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList)
+ }
+ val blocksNoLocality = (halfpartitions + 1 to partitions).map { i =>
+ (i, List())
+ }
+ val blocks = firstBlocks ++ blocksNoLocality
+
+ val data2 = sc.makeRDD(blocks)
+
+ // first try going to same number of partitions
+ val coalesced2 = data2.coalesce(partitions)
+
+ // test that we have 10000 partitions
+ assert(coalesced2.partitions.size == 10000, "Expected 10000 partitions, but got " +
+ coalesced2.partitions.size)
+
+ // test that we have 100 partitions
+ val coalesced3 = data2.coalesce(numMachines * 2)
+ assert(coalesced3.partitions.size == 100, "Expected 100 partitions, but got " +
+ coalesced3.partitions.size)
+
+ // test that the groups are load balanced with 100 +/- 20 elements in each
+ val maxImbalance3 = coalesced3.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
+ .foldLeft(0)((dev, curr) => math.max(math.abs(100 - curr), dev))
+ assert(maxImbalance3 <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance3)
+ }
+ }
+
// Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception
test("coalesced RDDs with locality, fail first pass") {
val initialPartitions = 1000