aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala158
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala69
2 files changed, 165 insertions, 62 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index e75f1dbf81..c19ed1529b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -169,42 +169,37 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
var noLocality = true // if true if no preferredLocations exists for parent RDD
- // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
- def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
- prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
- }
-
- // this class just keeps iterating and rotating infinitely over the partitions of the RDD
- // next() returns the next preferred machine that a partition is replicated on
- // the rotator first goes through the first replica copy of each partition, then second, third
- // the iterators return type is a tuple: (replicaString, partition)
- class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
-
- var it: Iterator[(String, Partition)] = resetIterator()
-
- override val isEmpty = !it.hasNext
-
- // initializes/resets to start iterating from the beginning
- def resetIterator(): Iterator[(String, Partition)] = {
- val iterators = (0 to 2).map { x =>
- prev.partitions.iterator.flatMap { p =>
- if (currPrefLocs(p, prev).size > x) Some((currPrefLocs(p, prev)(x), p)) else None
+ class PartitionLocations(prev: RDD[_]) {
+
+ // contains all the partitions from the previous RDD that don't have preferred locations
+ val partsWithoutLocs = ArrayBuffer[Partition]()
+ // contains all the partitions from the previous RDD that have preferred locations
+ val partsWithLocs = ArrayBuffer[(String, Partition)]()
+
+ getAllPrefLocs(prev)
+
+ // gets all the preffered locations of the previous RDD and splits them into partitions
+ // with preferred locations and ones without
+ def getAllPrefLocs(prev: RDD[_]) {
+ val tmpPartsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]()
+ // first get the locations for each partition, only do this once since it can be expensive
+ prev.partitions.foreach(p => {
+ val locs = prev.context.getPreferredLocs(prev, p.index).map(tl => tl.host)
+ if (locs.size > 0) {
+ tmpPartsWithLocs.put(p, locs)
+ } else {
+ partsWithoutLocs += p
+ }
}
- }
- iterators.reduceLeft((x, y) => x ++ y)
- }
-
- // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
- override def hasNext: Boolean = { !isEmpty }
-
- // return the next preferredLocation of some partition of the RDD
- override def next(): (String, Partition) = {
- if (it.hasNext) {
- it.next()
- } else {
- it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
- it.next()
- }
+ )
+ // convert it into an array of host to partition
+ (0 to 2).map(x =>
+ tmpPartsWithLocs.foreach(parts => {
+ val p = parts._1
+ val locs = parts._2
+ if (locs.size > x) partsWithLocs += ((locs(x), p))
+ } )
+ )
}
}
@@ -228,33 +223,32 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
}
/**
- * Initializes targetLen partition groups and assigns a preferredLocation
- * This uses coupon collector to estimate how many preferredLocations it must rotate through
- * until it has seen most of the preferred locations (2 * n log(n))
+ * Initializes targetLen partition groups. If there are preferred locations, each group
+ * is assigned a preferredLocation. This uses coupon collector to estimate how many
+ * preferredLocations it must rotate through until it has seen most of the preferred
+ * locations (2 * n log(n))
* @param targetLen
*/
- def setupGroups(targetLen: Int, prev: RDD[_]) {
- val rotIt = new LocationIterator(prev)
-
+ def setupGroups(targetLen: Int, partitionLocs: PartitionLocations) {
// deal with empty case, just create targetLen partition groups with no preferred location
- if (!rotIt.hasNext) {
+ if (partitionLocs.partsWithLocs.isEmpty) {
(1 to targetLen).foreach(x => groupArr += new PartitionGroup())
return
}
noLocality = false
-
// number of iterations needed to be certain that we've seen most preferred locations
val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
var numCreated = 0
var tries = 0
// rotate through until either targetLen unique/distinct preferred locations have been created
- // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
- // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
- while (numCreated < targetLen && tries < expectedCoupons2) {
+ // OR (we have went through either all partitions OR we've rotated expectedCoupons2 - in
+ // which case we have likely seen all preferred locations)
+ val numPartsToLookAt = math.min(expectedCoupons2, partitionLocs.partsWithLocs.length)
+ while (numCreated < targetLen && tries < numPartsToLookAt) {
+ val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
tries += 1
- val (nxt_replica, nxt_part) = rotIt.next()
if (!groupHash.contains(nxt_replica)) {
val pgroup = new PartitionGroup(Some(nxt_replica))
groupArr += pgroup
@@ -263,20 +257,18 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
numCreated += 1
}
}
-
- while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates
- var (nxt_replica, nxt_part) = rotIt.next()
+ tries = 0
+ // if we don't have enough partition groups, create duplicates
+ while (numCreated < targetLen) {
+ var (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
+ tries += 1
val pgroup = new PartitionGroup(Some(nxt_replica))
groupArr += pgroup
groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
- var tries = 0
- while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
- nxt_part = rotIt.next()._2
- tries += 1
- }
+ addPartToPGroup(nxt_part, pgroup)
numCreated += 1
+ if (tries >= partitionLocs.partsWithLocs.length) tries = 0
}
-
}
/**
@@ -289,10 +281,15 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
* imbalance in favor of locality
* @return partition group (bin to be put in)
*/
- def pickBin(p: Partition, prev: RDD[_], balanceSlack: Double): PartitionGroup = {
+ def pickBin(
+ p: Partition,
+ prev: RDD[_],
+ balanceSlack: Double,
+ partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
+ val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq
// least loaded pref locs
- val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare)
+ val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)
@@ -320,7 +317,10 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
}
}
- def throwBalls(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+ def throwBalls(
+ maxPartitions: Int,
+ prev: RDD[_],
+ balanceSlack: Double, partitionLocs: PartitionLocations) {
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
for ((p, i) <- prev.partitions.zipWithIndex) {
@@ -334,8 +334,39 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
}
}
} else {
+ // It is possible to have unionRDD where one rdd has preferred locations and another rdd
+ // that doesn't. To make sure we end up with the requested number of partitions,
+ // make sure to put a partition in every group.
+
+ // if we don't have a partition assigned to every group first try to fill them
+ // with the partitions with preferred locations
+ val partIter = partitionLocs.partsWithLocs.iterator
+ groupArr.filter(pg => pg.numPartitions == 0).foreach { pg =>
+ while (partIter.hasNext && pg.numPartitions == 0) {
+ var (nxt_replica, nxt_part) = partIter.next()
+ if (!initialHash.contains(nxt_part)) {
+ pg.partitions += nxt_part
+ initialHash += nxt_part
+ }
+ }
+ }
+
+ // if we didn't get one partitions per group from partitions with preferred locations
+ // use partitions without preferred locations
+ val partNoLocIter = partitionLocs.partsWithoutLocs.iterator
+ groupArr.filter(pg => pg.numPartitions == 0).foreach { pg =>
+ while (partNoLocIter.hasNext && pg.numPartitions == 0) {
+ var nxt_part = partNoLocIter.next()
+ if (!initialHash.contains(nxt_part)) {
+ pg.partitions += nxt_part
+ initialHash += nxt_part
+ }
+ }
+ }
+
+ // finally pick bin for the rest
for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
- pickBin(p, prev, balanceSlack).partitions += p
+ pickBin(p, prev, balanceSlack, partitionLocs).partitions += p
}
}
}
@@ -349,8 +380,11 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
* @return array of partition groups
*/
def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = {
- setupGroups(math.min(prev.partitions.length, maxPartitions), prev) // setup the groups (bins)
- throwBalls(maxPartitions, prev, balanceSlack) // assign partitions (balls) to each group (bins)
+ val partitionLocs = new PartitionLocations(prev)
+ // setup the groups (bins)
+ setupGroups(math.min(prev.partitions.length, maxPartitions), partitionLocs)
+ // assign partitions (balls) to each group (bins)
+ throwBalls(maxPartitions, prev, balanceSlack, partitionLocs)
getPartitions
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 8dc463d56d..a663dab772 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -377,6 +377,33 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
}
+ test("coalesced RDDs with partial locality") {
+ // Make an RDD that has some locality preferences and some without. This can happen
+ // with UnionRDD
+ val data = sc.makeRDD((1 to 9).map(i => {
+ if (i > 4) {
+ (i, (i to (i + 2)).map { j => "m" + (j % 6) })
+ } else {
+ (i, Vector())
+ }
+ }))
+ val coalesced1 = data.coalesce(3)
+ assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
+
+ val splits = coalesced1.glom().collect().map(_.toList).toList
+ assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length)
+
+ assert(splits.forall(_.length >= 1) === true, "Some partitions were empty")
+
+ // If we try to coalesce into more partitions than the original RDD, it should just
+ // keep the original number of partitions.
+ val coalesced4 = data.coalesce(20)
+ val listOfLists = coalesced4.glom().collect().map(_.toList).toList
+ val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
+ assert(sortedList === (1 to 9).
+ map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
+ }
+
test("coalesced RDDs with locality, large scale (10K partitions)") {
// large scale experiment
import collection.mutable
@@ -418,6 +445,48 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
+ test("coalesced RDDs with partial locality, large scale (10K partitions)") {
+ // large scale experiment
+ import collection.mutable
+ val halfpartitions = 5000
+ val partitions = 10000
+ val numMachines = 50
+ val machines = mutable.ListBuffer[String]()
+ (1 to numMachines).foreach(machines += "m" + _)
+ val rnd = scala.util.Random
+ for (seed <- 1 to 5) {
+ rnd.setSeed(seed)
+
+ val firstBlocks = (1 to halfpartitions).map { i =>
+ (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList)
+ }
+ val blocksNoLocality = (halfpartitions + 1 to partitions).map { i =>
+ (i, List())
+ }
+ val blocks = firstBlocks ++ blocksNoLocality
+
+ val data2 = sc.makeRDD(blocks)
+
+ // first try going to same number of partitions
+ val coalesced2 = data2.coalesce(partitions)
+
+ // test that we have 10000 partitions
+ assert(coalesced2.partitions.size == 10000, "Expected 10000 partitions, but got " +
+ coalesced2.partitions.size)
+
+ // test that we have 100 partitions
+ val coalesced3 = data2.coalesce(numMachines * 2)
+ assert(coalesced3.partitions.size == 100, "Expected 100 partitions, but got " +
+ coalesced3.partitions.size)
+
+ // test that the groups are load balanced with 100 +/- 20 elements in each
+ val maxImbalance3 = coalesced3.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
+ .foldLeft(0)((dev, curr) => math.max(math.abs(100 - curr), dev))
+ assert(maxImbalance3 <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance3)
+ }
+ }
+
// Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception
test("coalesced RDDs with locality, fail first pass") {
val initialPartitions = 1000