diff options
author | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-14 20:21:53 -0700 |
---|---|---|
committer | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-20 16:13:36 -0700 |
commit | 937f72feb86b406056bd163ce7320f582b70ab16 (patch) | |
tree | d4f67159ed33ab17bc6c779a32e808b6a90816c9 /core | |
parent | c4d59910b149b8b9bbf729f38e3eef3fb64fc85b (diff) | |
download | spark-937f72feb86b406056bd163ce7320f582b70ab16.tar.gz spark-937f72feb86b406056bd163ce7320f582b70ab16.tar.bz2 spark-937f72feb86b406056bd163ce7320f582b70ab16.zip |
word wrap before 100 chars per line
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/CoalescedRDD.scala | 75 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 17 |
2 files changed, 51 insertions, 41 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 6af55cd80c..beed2d5b69 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -37,13 +37,15 @@ private[spark] case class CoalescedRDDPartition( } /** - * Gets the preferred location for this coalesced RDD partition. Most parent indices should prefer this machine. + * Gets the preferred location for this coalesced RDD partition. + * Most parent indices should prefer this machine. * @return preferred location */ def getPreferredLocation = prefLoc /** - * Computes how many of the parents partitions have getPreferredLocation as one of their preferredLocations + * Computes how many of the parents partitions have getPreferredLocation + * as one of their preferredLocations * @return locality of this coalesced partition between 0 and 1 */ def localFraction :Double = { @@ -61,24 +63,24 @@ private[spark] case class CoalescedRDDPartition( * This transformation is useful when an RDD with many partitions gets filtered into a smaller one, * or to avoid having a large number of small tasks when processing a directory with many files. * - * If there is no locality information (no preferredLocations) in the parent RDD, then the coalescing + * If there is no locality information (no preferredLocations) in the parent, then the coalescing * is very simple: chunk parents that are close in the Array in chunks. - * If there is locality information, it proceeds to pack them with the following three goals in mind: + * If there is locality information, it proceeds to pack them with the following three goals: * * (1) Balance the groups so they roughly have the same number of parent partitions - * (2) Achieve locality per partition, i.e. there exists one machine which most parent partitions prefer - * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (underlying problem is likely NP-hard) + * (2) Achieve locality per partition, i.e. there exists one machine which most parent splits prefer + * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard) * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine * * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000. * We assume the final number of desired partitions is small, e.g. less than 1000. * - * The algorithm tries to assign unique preferred machines to each partition. If the number of desired - * partitions is greater than the number of preferred machines (can happen), it needs to start picking - * duplicate preferred machines. This is determined using coupon collector estimation (2n log(n)). - * The load balancing is done using power-of-two randomized bins-balls with one twist: it tries to - * also achieve locality. This is done by allowing a slack (balanceSlack) between two bins. - * If two bins are within the slack in terms of balance, the algorithm will assign partitions + * The algorithm tries to assign unique preferred machines to each partition. If the number of + * desired partitions is greater than the number of preferred machines (can happen), it needs to + * start picking duplicate preferred machines. This is determined using coupon collector estimation + * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist: + * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two + * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions * according to locality. (contact alig for questions) * */ @@ -119,8 +121,8 @@ class CoalescedRDD[T: ClassManifest]( } /** - * Returns the preferred machine for the split. If split is of type CoalescedRDDPartition, then the preferred machine - * will be one which most parent splits prefer too. + * Returns the preferred machine for the split. If split is of type CoalescedRDDPartition, + * then the preferred machine will be one which most parent splits prefer too. * @param split * @return the machine most preferred by split */ @@ -157,14 +159,14 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) private var noLocality = true // if true if no preferredLocations exists for parent RDD - this.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) and preferred locations + this.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) this.throwBalls() // assign partitions (balls) to each group (bins) def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray // this class just keeps iterating and rotating infinitely over the partitions of the RDD // next() returns the next preferred machine that a partition is replicated on - // the rotator first goes through the first replica copy of each partition, then second, then third + // the rotator first goes through the first replica copy of each partition, then second, third // the iterators return type is a tuple: (replicaString, partition) private class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] { @@ -174,13 +176,16 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) // initializes/resets to start iterating from the beginning private def resetIterator() = { val i1 = prev.partitions.view.map( (p: Partition) => - { if (prev.preferredLocations(p).length > 0) Some((prev.preferredLocations(p)(0),p)) else None } ) + { if (prev.preferredLocations(p).length > 0) + Some((prev.preferredLocations(p)(0),p)) else None } ) val i2 = prev.partitions.view.map( (p: Partition) => - { if (prev.preferredLocations(p).length > 1) Some((prev.preferredLocations(p)(1),p)) else None } ) + { if (prev.preferredLocations(p).length > 1) + Some((prev.preferredLocations(p)(1),p)) else None } ) val i3 = prev.partitions.view.map( (p: Partition) => - { if (prev.preferredLocations(p).length > 2) Some((prev.preferredLocations(p)(2),p)) else None } ) + { if (prev.preferredLocations(p).length > 2) + Some((prev.preferredLocations(p)(2),p)) else None } ) val res = List(i1,i2,i3) - res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) into one iterator + res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) } // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD @@ -215,22 +220,22 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) def addPartToPGroup(part : Partition, pgroup : PartitionGroup) : Boolean = { if (!initialHash.contains(part)) { - pgroup.list += part // already preassign this element, ensures every bucket will have 1 element - initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets/groups + pgroup.list += part // already assign this element + initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets true } else false } /** * Initializes targetLen partition groups and assigns a preferredLocation - * This uses coupon collector to estimate how many preferredLocations it must rotate through until it has seen - * most of the preferred locations (2 * n log(n)) + * This uses coupon collector to estimate how many preferredLocations it must rotate through + * until it has seen most of the preferred locations (2 * n log(n)) * @param targetLen */ private def setupGroups(targetLen: Int) { val rotIt = new RotateLocations(prev) - // deal with empty rotator case, just create targetLen partition groups with no preferred location + // deal with empty case, just create targetLen partition groups with no preferred location if (!rotIt.hasNext()) { (1 to targetLen).foreach(x => groupArr += PartitionGroup()) return @@ -243,9 +248,9 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) var numCreated = 0 var tries = 0 - // rotate through until either targetLen unique/distinct preferred locations have been created OR - // we've rotated expectedCoupons2, in which case we have likely seen all preferred locations, i.e. - // likely targetLen >> number of preferred locations (more buckets than there are machines) + // rotate through until either targetLen unique/distinct preferred locations have been created + // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations, + // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines) while (numCreated < targetLen && tries < expectedCoupons2) { tries += 1 val (nxt_replica, nxt_part) = rotIt.next() @@ -253,18 +258,18 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup addPartToPGroup(nxt_part, pgroup) - groupHash += (nxt_replica -> (mutable.ListBuffer(pgroup))) // list in case we have multiple groups per machine + groupHash += (nxt_replica -> (mutable.ListBuffer(pgroup))) // list in case we have multiple numCreated += 1 } } - while (numCreated < targetLen) { // if we don't have enough partition groups, just create duplicates + while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates var (nxt_replica, nxt_part) = rotIt.next() val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup groupHash.get(nxt_replica).get += pgroup var tries = 0 - while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure each group has at least one partition + while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part nxt_part = rotIt.next()._2 tries += 1 } @@ -281,12 +286,12 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) * @return partition group (bin to be put in) */ private def pickBin(p: Partition): PartitionGroup = { - val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded bin of replicas + val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least load val prefPart = if (pref == Nil) None else pref.head val r1 = rnd.nextInt(groupArr.size) val r2 = rnd.nextInt(groupArr.size) - val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2) // power of 2 + val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2) if (prefPart== None) // if no preferred locations, just use basic power of two return minPowerOfTwo @@ -305,7 +310,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) for ((p,i) <- prev.partitions.zipWithIndex) { groupArr(i).list += p } - } else { // old code, just splits them grouping partitions that are next to each other in the array + } else { // old code, just splits them grouping partitions that are next to each other (0 until maxPartitions).foreach { i => val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt @@ -313,7 +318,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) } } } else { - for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into a partition group + for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group pickBin(p).list += p } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index c200bfe909..a757aebd65 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -178,18 +178,20 @@ class RDDSuite extends FunSuite with SharedSparkContext { // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5 val data = sc.makeRDD((1 to 9).map( i => (i, (i to (i+2)).map{ j => "m" + (j%6)} ))) val coalesced1 = data.coalesce(3) - assert(coalesced1.collect().toList.sorted === (1 to 9).toList) // no data lost (NB: order might reshuffle) + assert(coalesced1.collect().toList.sorted === (1 to 9).toList) // no data lost val splits = coalesced1.glom().collect().map(_.toList).toList assert(splits.length === 3) // ensure it indeed created 3 partitions - assert(splits.foldLeft(true)( (x,y) => if (!x) false else y.length >= 2) === true) // descent balance (2+ per bin) + assert(splits.foldLeft(true) + ( (x,y) => if (!x) false else y.length >= 2) === true) // (2+ balance) // If we try to coalesce into more partitions than the original RDD, it should just // keep the original number of partitions. val coalesced4 = data.coalesce(20) assert(coalesced4.glom().collect().map(_.toList).toList.sortWith( - (x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9).map(x => List(x)).toList) + (x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9). + map(x => List(x)).toList) // large scale experiment @@ -200,18 +202,21 @@ class RDDSuite extends FunSuite with SharedSparkContext { val machines = mutable.ListBuffer[String]() (1 to numMachines).foreach(machines += "m"+_) - val blocks = (1 to partitions).map( i => (i, (i to (i+2)).map{ j => machines(rnd.nextInt(machines.size)) } )) + val blocks = (1 to partitions).map( i => (i, (i to (i+2)) + .map{ j => machines(rnd.nextInt(machines.size)) } )) val data2 = sc.makeRDD(blocks) val coalesced2 = data2.coalesce(numMachines*2) // test that you get over 95% locality in each group - val minLocality = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction ) + val minLocality = coalesced2.partitions + .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction ) .foldLeft(100.)( (perc, loc) => math.min(perc,loc) ) assert(minLocality > 0.95) // test that the groups are load balanced with 100 +/- 15 elements in each - val maxImbalance = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size ) + val maxImbalance = coalesced2.partitions + .map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size ) .foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev)) assert(maxImbalance < 15) } |