From f1c853d76dce4fbc34f580be0a3ae15cc5be9c80 Mon Sep 17 00:00:00 2001 From: Ali Ghodsi Date: Sun, 18 Aug 2013 20:42:35 -0700 Subject: fixed Matei's comments --- core/src/main/scala/spark/SparkContext.scala | 2 +- core/src/main/scala/spark/rdd/CoalescedRDD.scala | 145 +++++++++++++---------- core/src/test/scala/spark/RDDSuite.scala | 25 ++-- 3 files changed, 99 insertions(+), 73 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 544c971efc..7639749ecb 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -620,7 +620,7 @@ class SparkContext( * @param partition to be looked up for locality * @return list of preferred locations for the partition */ - def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = + private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = dagScheduler.getPreferredLocs(rdd, partition) /** diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index a5e3d74447..c475b7a8aa 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -21,13 +21,21 @@ import spark._ import java.io.{ObjectOutputStream, IOException} import scala.collection.mutable import scala.Some +import scala.collection.mutable.ArrayBuffer +/** + * Class that captures a coalesced RDD by essentially keeping track of parent partitions + * @param index of this coalesced partition + * @param rdd which it belongs to + * @param parentsIndices list of indices in the parent that have been coalesced into this partition + * @param preferredLocation the preferred location for this partition + */ case class CoalescedRDDPartition( - index: Int, - @transient rdd: RDD[_], - parentsIndices: Array[Int], - @transient preferredLocation: String = "" - ) extends Partition { + index: Int, + @transient rdd: RDD[_], + parentsIndices: Array[Int], + @transient preferredLocation: String = "" + ) extends Partition { var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_)) @throws(classOf[IOException]) @@ -43,45 +51,27 @@ case class CoalescedRDDPartition( * @return locality of this coalesced partition between 0 and 1 */ def localFraction: Double = { - val loc = parents.count(p => rdd.context.getPreferredLocs(rdd, p.index) - .contains(preferredLocation)) + val loc = parents.count(p => + rdd.context.getPreferredLocs(rdd, p.index).contains(preferredLocation)) + if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) } } /** - * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of - * this RDD computes one or more of the parent ones. Will produce exactly `maxPartitions` if the - * parent had more than this many partitions, or fewer if the parent had fewer. - * - * This transformation is useful when an RDD with many partitions gets filtered into a smaller one, - * or to avoid having a large number of small tasks when processing a directory with many files. - * - * If there is no locality information (no preferredLocations) in the parent, then the coalescing - * is very simple: chunk parents that are close in the Array in chunks. - * If there is locality information, it proceeds to pack them with the following three goals: - * - * (1) Balance the groups so they roughly have the same number of parent partitions - * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer - * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard) - * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine - * - * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000. - * We assume the final number of desired partitions is small, e.g. less than 1000. - * - * The algorithm tries to assign unique preferred machines to each partition. If the number of - * desired partitions is greater than the number of preferred machines (can happen), it needs to - * start picking duplicate preferred machines. This is determined using coupon collector estimation - * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist: - * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two - * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions - * according to locality. (contact alig for questions) - * + * Represents a coalesced RDD that has fewer partitions than its parent RDD + * This class uses the PartitionCoalescer class to find a good partitioning of the parent RDD + * so that each new partition has roughly the same number of parent partitions and that + * the preferred location of each new partition overlaps with as many preferred locations of its + * parent partitions + * @param prev RDD to be coalesced + * @param maxPartitions number of desired partitions in the coalesced RDD + * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance */ class CoalescedRDD[T: ClassManifest]( - @transient var prev: RDD[T], - maxPartitions: Int, - balanceSlack: Double = 0.10) + @transient var prev: RDD[T], + maxPartitions: Int, + balanceSlack: Double = 0.10) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { @@ -128,44 +118,75 @@ class CoalescedRDD[T: ClassManifest]( } -private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], - balanceSlack: Double) { - protected def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size - protected def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = +/** + * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of + * this RDD computes one or more of the parent ones. Will produce exactly `maxPartitions` if the + * parent had more than this many partitions, or fewer if the parent had fewer. + * + * This transformation is useful when an RDD with many partitions gets filtered into a smaller one, + * or to avoid having a large number of small tasks when processing a directory with many files. + * + * If there is no locality information (no preferredLocations) in the parent, then the coalescing + * is very simple: chunk parents that are close in the Array in chunks. + * If there is locality information, it proceeds to pack them with the following three goals: + * + * (1) Balance the groups so they roughly have the same number of parent partitions + * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer + * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard) + * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine + * + * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000. + * We assume the final number of desired partitions is small, e.g. less than 1000. + * + * The algorithm tries to assign unique preferred machines to each partition. If the number of + * desired partitions is greater than the number of preferred machines (can happen), it needs to + * start picking duplicate preferred machines. This is determined using coupon collector estimation + * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist: + * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two + * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions + * according to locality. (contact alig for questions) + * + */ + +private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { + + def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size + def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get) - protected val rnd = new scala.util.Random(7919) // keep this class deterministic + val rnd = new scala.util.Random(7919) // keep this class deterministic // each element of groupArr represents one coalesced partition - protected val groupArr = mutable.ArrayBuffer[PartitionGroup]() + val groupArr = ArrayBuffer[PartitionGroup]() // hash used to check whether some machine is already in groupArr - protected val groupHash = mutable.Map[String, mutable.ListBuffer[PartitionGroup]]() + val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]() // hash used for the first maxPartitions (to avoid duplicates) - protected val initialHash = mutable.Map[Partition, Boolean]() + val initialHash = mutable.Set[Partition]() // determines the tradeoff between load-balancing the partitions sizes and their locality // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality - protected val slack = (balanceSlack * prev.partitions.size).toInt + val slack = (balanceSlack * prev.partitions.size).toInt - protected var noLocality = true // if true if no preferredLocations exists for parent RDD + var noLocality = true // if true if no preferredLocations exists for parent RDD // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) - protected def currentPreferredLocations(rdd: RDD[_], part: Partition) : Seq[String] = + def currentPreferredLocations(rdd: RDD[_], part: Partition) : Seq[String] = rdd.context.getPreferredLocs(rdd, part.index) // this class just keeps iterating and rotating infinitely over the partitions of the RDD // next() returns the next preferred machine that a partition is replicated on // the rotator first goes through the first replica copy of each partition, then second, third // the iterators return type is a tuple: (replicaString, partition) - protected class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] { + class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] { + + var it: Iterator[(String, Partition)] = resetIterator() - protected var it: Iterator[(String, Partition)] = resetIterator() override val isEmpty = !it.hasNext // initializes/resets to start iterating from the beginning - protected def resetIterator() = { + def resetIterator() = { val i1 = prev.partitions.view.map{ p: Partition => { if (currentPreferredLocations(prev, p).length > 0) Some((currentPreferredLocations(prev, p)(0),p)) else None } } @@ -177,10 +198,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], Some((currentPreferredLocations(prev, p)(2),p)) else None } } val res = List(i1,i2,i3) res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) + + // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(0),p) }} ++ + // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(1),p) }} ++ + // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(2),p) }} } // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD - def hasNext(): Boolean = !isEmpty + def hasNext(): Boolean = { !isEmpty } // return the next preferredLocation of some partition of the RDD def next(): (String, Partition) = { @@ -199,14 +224,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], * @param key string representing a partitioned group on preferred machine key * @return Option of PartitionGroup that has least elements for key */ - protected def getLeastGroupHash(key: String): Option[PartitionGroup] = { + def getLeastGroupHash(key: String): Option[PartitionGroup] = { groupHash.get(key).map(_.sortWith(compare).head) } def addPartToPGroup(part : Partition, pgroup : PartitionGroup) : Boolean = { if (!initialHash.contains(part)) { pgroup.list += part // already assign this element - initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets + initialHash += part // needed to avoid assigning partitions to multiple buckets true } else { false } } @@ -217,8 +242,8 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], * until it has seen most of the preferred locations (2 * n log(n)) * @param targetLen */ - protected def setupGroups(targetLen: Int) { - val rotIt = new RotateLocations(prev) + def setupGroups(targetLen: Int) { + val rotIt = new LocationIterator(prev) // deal with empty case, just create targetLen partition groups with no preferred location if (!rotIt.hasNext()) { @@ -243,7 +268,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup addPartToPGroup(nxt_part, pgroup) - groupHash += (nxt_replica -> (mutable.ListBuffer(pgroup))) // list in case we have multiple + groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple numCreated += 1 } } @@ -270,7 +295,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], * @param p partition (ball to be thrown) * @return partition group (bin to be put in) */ - protected def pickBin(p: Partition): PartitionGroup = { + def pickBin(p: Partition): PartitionGroup = { val pref = prev.context.getPreferredLocs(prev, p.index). map(getLeastGroupHash(_)).sortWith(compare) // least loaded of the pref locations val prefPart = if (pref == Nil) None else pref.head @@ -290,7 +315,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], } } - protected def throwBalls() { + def throwBalls() { if (noLocality) { // no preferredLocations in parent RDD, no randomization needed if (maxPartitions > groupArr.size) { // just return prev.partitions for ((p,i) <- prev.partitions.zipWithIndex) { @@ -310,7 +335,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], } } - protected def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray + def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray /** * Runs the packing algorithm and returns an array of PartitionGroups that if possible are diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index da5f78aef5..14f7c62782 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -176,7 +176,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { } test("cogrouped RDDs with locality") { // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5 - val data = sc.makeRDD((1 to 9).map( i => (i, (i to (i+2)).map{ j => "m" + (j%6)} ))) + val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i+2)).map{ j => "m" + (j%6)}))) val coalesced1 = data.coalesce(3) assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing") @@ -184,7 +184,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(splits.length == 3, "Supposed to coalesce to 3 but got " + splits.length) assert(splits.foldLeft(true) - ( (x,y) => if (!x) false else y.length >= 1) === true, "Some partitions were empty") + ((x,y) => if (!x) false else y.length >= 1) === true, "Some partitions were empty") // If we try to coalesce into more partitions than the original RDD, it should just // keep the original number of partitions. @@ -193,7 +193,9 @@ class RDDSuite extends FunSuite with SharedSparkContext { (x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9). map(x => List(x)).toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back") + } + test("cogrouped RDDs with locality, large scale (10K partitions)") { // large scale experiment import collection.mutable val rnd = scala.util.Random @@ -202,30 +204,29 @@ class RDDSuite extends FunSuite with SharedSparkContext { val machines = mutable.ListBuffer[String]() (1 to numMachines).foreach(machines += "m"+_) - val blocks = (1 to partitions).map( i => (i, (i to (i+2)) - .map{ j => machines(rnd.nextInt(machines.size)) } )) + val blocks = (1 to partitions).map(i => (i, (i to (i+2)) + .map{ j => machines(rnd.nextInt(machines.size))})) - val data2 = sc.makeRDD(blocks) // .map( i => i*2 ) + val data2 = sc.makeRDD(blocks) val coalesced2 = data2.coalesce(numMachines*2) // test that you get over 90% locality in each group val minLocality = coalesced2.partitions - .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction ) - .foldLeft(1.)( (perc, loc) => math.min(perc,loc) ) + .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction) + .foldLeft(1.)((perc, loc) => math.min(perc,loc)) assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%") // test that the groups are load balanced with 100 +/- 20 elements in each val maxImbalance = coalesced2.partitions - .map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size ) + .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size) .foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev)) assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance) - // TDD: Test for later when we have implemented functionality to get locality from DAGScheduler - val data3 = sc.makeRDD(blocks).map( i => i*2 ) // derived RDD to test *current* pref locs + val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs val coalesced3 = data3.coalesce(numMachines*2) val minLocality2 = coalesced3.partitions - .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction ) - .foldLeft(1.)( (perc, loc) => math.min(perc,loc) ) + .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction) + .foldLeft(1.)((perc, loc) => math.min(perc,loc)) assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " + (minLocality2*100.).toInt + "%") } -- cgit v1.2.3