aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-14 20:21:53 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:13:36 -0700
commit937f72feb86b406056bd163ce7320f582b70ab16 (patch)
treed4f67159ed33ab17bc6c779a32e808b6a90816c9 /core
parentc4d59910b149b8b9bbf729f38e3eef3fb64fc85b (diff)
downloadspark-937f72feb86b406056bd163ce7320f582b70ab16.tar.gz
spark-937f72feb86b406056bd163ce7320f582b70ab16.tar.bz2
spark-937f72feb86b406056bd163ce7320f582b70ab16.zip
word wrap before 100 chars per line
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala75
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala17
2 files changed, 51 insertions, 41 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 6af55cd80c..beed2d5b69 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -37,13 +37,15 @@ private[spark] case class CoalescedRDDPartition(
}
/**
- * Gets the preferred location for this coalesced RDD partition. Most parent indices should prefer this machine.
+ * Gets the preferred location for this coalesced RDD partition.
+ * Most parent indices should prefer this machine.
* @return preferred location
*/
def getPreferredLocation = prefLoc
/**
- * Computes how many of the parents partitions have getPreferredLocation as one of their preferredLocations
+ * Computes how many of the parents partitions have getPreferredLocation
+ * as one of their preferredLocations
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction :Double = {
@@ -61,24 +63,24 @@ private[spark] case class CoalescedRDDPartition(
* This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
* or to avoid having a large number of small tasks when processing a directory with many files.
*
- * If there is no locality information (no preferredLocations) in the parent RDD, then the coalescing
+ * If there is no locality information (no preferredLocations) in the parent, then the coalescing
* is very simple: chunk parents that are close in the Array in chunks.
- * If there is locality information, it proceeds to pack them with the following three goals in mind:
+ * If there is locality information, it proceeds to pack them with the following three goals:
*
* (1) Balance the groups so they roughly have the same number of parent partitions
- * (2) Achieve locality per partition, i.e. there exists one machine which most parent partitions prefer
- * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (underlying problem is likely NP-hard)
+ * (2) Achieve locality per partition, i.e. there exists one machine which most parent splits prefer
+ * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)
* (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
*
* Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000.
* We assume the final number of desired partitions is small, e.g. less than 1000.
*
- * The algorithm tries to assign unique preferred machines to each partition. If the number of desired
- * partitions is greater than the number of preferred machines (can happen), it needs to start picking
- * duplicate preferred machines. This is determined using coupon collector estimation (2n log(n)).
- * The load balancing is done using power-of-two randomized bins-balls with one twist: it tries to
- * also achieve locality. This is done by allowing a slack (balanceSlack) between two bins.
- * If two bins are within the slack in terms of balance, the algorithm will assign partitions
+ * The algorithm tries to assign unique preferred machines to each partition. If the number of
+ * desired partitions is greater than the number of preferred machines (can happen), it needs to
+ * start picking duplicate preferred machines. This is determined using coupon collector estimation
+ * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
+ * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
+ * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
* according to locality. (contact alig for questions)
*
*/
@@ -119,8 +121,8 @@ class CoalescedRDD[T: ClassManifest](
}
/**
- * Returns the preferred machine for the split. If split is of type CoalescedRDDPartition, then the preferred machine
- * will be one which most parent splits prefer too.
+ * Returns the preferred machine for the split. If split is of type CoalescedRDDPartition,
+ * then the preferred machine will be one which most parent splits prefer too.
* @param split
* @return the machine most preferred by split
*/
@@ -157,14 +159,14 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
private var noLocality = true // if true if no preferredLocations exists for parent RDD
- this.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) and preferred locations
+ this.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
this.throwBalls() // assign partitions (balls) to each group (bins)
def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
- // the rotator first goes through the first replica copy of each partition, then second, then third
+ // the rotator first goes through the first replica copy of each partition, then second, third
// the iterators return type is a tuple: (replicaString, partition)
private class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] {
@@ -174,13 +176,16 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
// initializes/resets to start iterating from the beginning
private def resetIterator() = {
val i1 = prev.partitions.view.map( (p: Partition) =>
- { if (prev.preferredLocations(p).length > 0) Some((prev.preferredLocations(p)(0),p)) else None } )
+ { if (prev.preferredLocations(p).length > 0)
+ Some((prev.preferredLocations(p)(0),p)) else None } )
val i2 = prev.partitions.view.map( (p: Partition) =>
- { if (prev.preferredLocations(p).length > 1) Some((prev.preferredLocations(p)(1),p)) else None } )
+ { if (prev.preferredLocations(p).length > 1)
+ Some((prev.preferredLocations(p)(1),p)) else None } )
val i3 = prev.partitions.view.map( (p: Partition) =>
- { if (prev.preferredLocations(p).length > 2) Some((prev.preferredLocations(p)(2),p)) else None } )
+ { if (prev.preferredLocations(p).length > 2)
+ Some((prev.preferredLocations(p)(2),p)) else None } )
val res = List(i1,i2,i3)
- res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) into one iterator
+ res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd)
}
// hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
@@ -215,22 +220,22 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
def addPartToPGroup(part : Partition, pgroup : PartitionGroup) : Boolean = {
if (!initialHash.contains(part)) {
- pgroup.list += part // already preassign this element, ensures every bucket will have 1 element
- initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets/groups
+ pgroup.list += part // already assign this element
+ initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets
true
} else false
}
/**
* Initializes targetLen partition groups and assigns a preferredLocation
- * This uses coupon collector to estimate how many preferredLocations it must rotate through until it has seen
- * most of the preferred locations (2 * n log(n))
+ * This uses coupon collector to estimate how many preferredLocations it must rotate through
+ * until it has seen most of the preferred locations (2 * n log(n))
* @param targetLen
*/
private def setupGroups(targetLen: Int) {
val rotIt = new RotateLocations(prev)
- // deal with empty rotator case, just create targetLen partition groups with no preferred location
+ // deal with empty case, just create targetLen partition groups with no preferred location
if (!rotIt.hasNext()) {
(1 to targetLen).foreach(x => groupArr += PartitionGroup())
return
@@ -243,9 +248,9 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
var numCreated = 0
var tries = 0
- // rotate through until either targetLen unique/distinct preferred locations have been created OR
- // we've rotated expectedCoupons2, in which case we have likely seen all preferred locations, i.e.
- // likely targetLen >> number of preferred locations (more buckets than there are machines)
+ // rotate through until either targetLen unique/distinct preferred locations have been created
+ // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
+ // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
while (numCreated < targetLen && tries < expectedCoupons2) {
tries += 1
val (nxt_replica, nxt_part) = rotIt.next()
@@ -253,18 +258,18 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
addPartToPGroup(nxt_part, pgroup)
- groupHash += (nxt_replica -> (mutable.ListBuffer(pgroup))) // list in case we have multiple groups per machine
+ groupHash += (nxt_replica -> (mutable.ListBuffer(pgroup))) // list in case we have multiple
numCreated += 1
}
}
- while (numCreated < targetLen) { // if we don't have enough partition groups, just create duplicates
+ while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates
var (nxt_replica, nxt_part) = rotIt.next()
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
groupHash.get(nxt_replica).get += pgroup
var tries = 0
- while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure each group has at least one partition
+ while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
nxt_part = rotIt.next()._2
tries += 1
}
@@ -281,12 +286,12 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
* @return partition group (bin to be put in)
*/
private def pickBin(p: Partition): PartitionGroup = {
- val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded bin of replicas
+ val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least load
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
- val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2) // power of 2
+ val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
if (prefPart== None) // if no preferred locations, just use basic power of two
return minPowerOfTwo
@@ -305,7 +310,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
for ((p,i) <- prev.partitions.zipWithIndex) {
groupArr(i).list += p
}
- } else { // old code, just splits them grouping partitions that are next to each other in the array
+ } else { // old code, just splits them grouping partitions that are next to each other
(0 until maxPartitions).foreach { i =>
val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
@@ -313,7 +318,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
}
} else {
- for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into a partition group
+ for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
pickBin(p).list += p
}
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index c200bfe909..a757aebd65 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -178,18 +178,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
val data = sc.makeRDD((1 to 9).map( i => (i, (i to (i+2)).map{ j => "m" + (j%6)} )))
val coalesced1 = data.coalesce(3)
- assert(coalesced1.collect().toList.sorted === (1 to 9).toList) // no data lost (NB: order might reshuffle)
+ assert(coalesced1.collect().toList.sorted === (1 to 9).toList) // no data lost
val splits = coalesced1.glom().collect().map(_.toList).toList
assert(splits.length === 3) // ensure it indeed created 3 partitions
- assert(splits.foldLeft(true)( (x,y) => if (!x) false else y.length >= 2) === true) // descent balance (2+ per bin)
+ assert(splits.foldLeft(true)
+ ( (x,y) => if (!x) false else y.length >= 2) === true) // (2+ balance)
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
val coalesced4 = data.coalesce(20)
assert(coalesced4.glom().collect().map(_.toList).toList.sortWith(
- (x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9).map(x => List(x)).toList)
+ (x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9).
+ map(x => List(x)).toList)
// large scale experiment
@@ -200,18 +202,21 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val machines = mutable.ListBuffer[String]()
(1 to numMachines).foreach(machines += "m"+_)
- val blocks = (1 to partitions).map( i => (i, (i to (i+2)).map{ j => machines(rnd.nextInt(machines.size)) } ))
+ val blocks = (1 to partitions).map( i => (i, (i to (i+2))
+ .map{ j => machines(rnd.nextInt(machines.size)) } ))
val data2 = sc.makeRDD(blocks)
val coalesced2 = data2.coalesce(numMachines*2)
// test that you get over 95% locality in each group
- val minLocality = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction )
+ val minLocality = coalesced2.partitions
+ .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction )
.foldLeft(100.)( (perc, loc) => math.min(perc,loc) )
assert(minLocality > 0.95)
// test that the groups are load balanced with 100 +/- 15 elements in each
- val maxImbalance = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size )
+ val maxImbalance = coalesced2.partitions
+ .map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size )
.foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
assert(maxImbalance < 15)
}