diff options
author | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-13 20:46:22 -0700 |
---|---|---|
committer | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-20 16:13:36 -0700 |
commit | 1ede102ba5863f6cee27437b0adbc4d54cedffb3 (patch) | |
tree | d4d575072e64103382ed248ff855898de93ac3c9 | |
parent | aa2b89d98d6d195a38e36c1947d437ab7346e5c9 (diff) | |
download | spark-1ede102ba5863f6cee27437b0adbc4d54cedffb3.tar.gz spark-1ede102ba5863f6cee27437b0adbc4d54cedffb3.tar.bz2 spark-1ede102ba5863f6cee27437b0adbc4d54cedffb3.zip |
load balancing coalescer
-rw-r--r-- | core/src/main/scala/spark/rdd/CoalescedRDD.scala | 207 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 22 |
2 files changed, 218 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 2b5bf18541..09ae9a8fa6 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -19,11 +19,13 @@ package spark.rdd import spark.{Dependency, OneToOneDependency, NarrowDependency, RDD, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} +import scala.collection.mutable private[spark] case class CoalescedRDDPartition( index: Int, @transient rdd: RDD[_], - parentsIndices: Array[Int] + parentsIndices: Array[Int], + prefLoc: String = "" ) extends Partition { var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_)) @@ -33,6 +35,8 @@ private[spark] case class CoalescedRDDPartition( parents = parentsIndices.map(rdd.partitions(_)) oos.defaultWriteObject() } + + def getPreferredLocation = prefLoc } /** @@ -45,20 +49,24 @@ private[spark] case class CoalescedRDDPartition( */ class CoalescedRDD[T: ClassManifest]( @transient var prev: RDD[T], - maxPartitions: Int) + maxPartitions: Int, + balanceSlack: Double = 0.20 ) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { - val prevSplits = prev.partitions - if (prevSplits.length < maxPartitions) { - prevSplits.map(_.index).map{idx => new CoalescedRDDPartition(idx, prev, Array(idx)) } - } else { - (0 until maxPartitions).map { i => - val rangeStart = ((i.toLong * prevSplits.length) / maxPartitions).toInt - val rangeEnd = (((i.toLong + 1) * prevSplits.length) / maxPartitions).toInt - new CoalescedRDDPartition(i, prev, (rangeStart until rangeEnd).toArray) - }.toArray + val res = mutable.ArrayBuffer[CoalescedRDDPartition]() + val targetLen = math.min(prev.partitions.length, maxPartitions) + val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack) + + packer.setupGroups(targetLen) // setup the groups (bins) and preferred locations + packer.throwBalls() // assign partitions (balls) to each group (bins) + + for ((pg, i) <- packer.groupArr.zipWithIndex) { + val ids = pg.list.map(_.index).toArray + res += new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) } + + res.toArray } override def compute(split: Partition, context: TaskContext): Iterator[T] = { @@ -79,3 +87,180 @@ class CoalescedRDD[T: ClassManifest]( prev = null } } + + +class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { + + // this class just keeps iterating and rotating infinitely over the partitions of the RDD + // next() returns the next preferred machine that a partition is replicated on + // the rotator first goes through the first replica copy of each partition, then second, then third + private class RotateLocations(prev: RDD[_]) extends Iterator[String] { + + private var it: Iterator[String] = resetIterator() + override val isEmpty = !it.hasNext + + // initializes/resets to start iterating from the beginning + private def resetIterator() = { + val i1 = prev.partitions.view.map( (p: Partition) => + { if (prev.preferredLocations(p).length > 0) Some(prev.preferredLocations(p)(0)) else None } ) + val i2 = prev.partitions.view.map( (p: Partition) => + { if (prev.preferredLocations(p).length > 1) Some(prev.preferredLocations(p)(1)) else None } ) + val i3 = prev.partitions.view.map( (p: Partition) => + { if (prev.preferredLocations(p).length > 2) Some(prev.preferredLocations(p)(2)) else None } ) + val res = List(i1,i2,i3) + res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) into one iterator + } + + // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD + def hasNext(): Boolean = !isEmpty + + // return the next preferredLocation of some partition of the RDD + def next(): String = { + if (it.hasNext) + it.next() + else { + it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning + it.next() + } + } + } + + case class PartitionGroup(prefLoc: String = "") { + var list = mutable.MutableList[Partition]() + + def size = list.size + + // returns number of partitions that got locality in this group + def local = { + var loc: Int = 0 + list.foreach(p => if (prev.preferredLocations(p).contains(prefLoc)) loc += 1) + loc + } + + override def toString(): String = { + val localityStr = if (size == 0) "0" else (local*100. / size).toInt.toString + "PartitionGroup(\"" + prefLoc + "\") size: " + size + " locality: " + localityStr +"% \n" + // list.map("\t\t" + _.toString).mkString("\n") + "\n" + } + } + + def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size + def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = + if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get) + + val rnd = new scala.util.Random(7919) // keep this class deterministic + + // each element of groupArr represents one coalesced partition + val groupArr = mutable.ArrayBuffer[PartitionGroup]() + + // hash used to check whether some machine is already in groupArr + val groupHash = mutable.Map[String, mutable.MutableList[PartitionGroup]]() + + // determines the tradeoff between load-balancing the partitions sizes and their locality + // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality + val slack = (balanceSlack * maxPartitions).toInt + + private var noLocality = true // if true if no preferredLocations exists for parent RDD + + /** + * Sorts and gets the least element of the list associated with key in groupHash + * The returned PartitionGroup is the least loaded of all groups that represent the machine "key" + * @param key string representing a partitioned group on preferred machine key + * @return Option of PartitionGroup that has least elements for key + */ + private def getLeastGroupHash(key: String): Option[PartitionGroup] = { + groupHash.get(key).map(_.sortWith(compare).head) + } + + /** + * Initializes targetLen partition groups and assigns a preferredLocation + * This uses coupon collector to estimate how many preferredLocations it must rotate through until it has seen + * most of the preferred locations (2 * n log(n)) + * @param targetLen + */ + def setupGroups(targetLen: Int) { + val rotIt = new RotateLocations(prev) + + // deal with empty rotator case, just create targetLen partition groups with no preferred location + if (!rotIt.hasNext()) { + (1 to targetLen).foreach(x => groupArr += PartitionGroup()) + return + } + + noLocality = false + + // number of iterations needed to be certain that we've seen most preferred locations + val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt + var numCreated = 0 + var tries = 0 + + // rotate through until either targetLen unique/distinct preferred locations have been created OR + // we've rotated expectedCoupons2, in which case we have likely seen all preferred locations, i.e. + // likely targetLen >> number of preferred locations (more buckets than there are machines) + while (numCreated < targetLen && tries < expectedCoupons2) { + tries += 1 + val nxt = rotIt.next() + if (!groupHash.contains(nxt)) { + val pgroup = PartitionGroup(nxt) + groupArr += pgroup + groupHash += (nxt -> (mutable.MutableList(pgroup))) // use list in case we have multiple groups for same machine + numCreated += 1 + } + } + + while (numCreated < targetLen) { // if we don't have enough partition groups, just create duplicates + val nxt = rotIt.next() + val pgroup = PartitionGroup(nxt) + groupArr += pgroup + groupHash.get(nxt).get += pgroup + numCreated += 1 + } + } + + /** + * Takes a parent RDD partition and decides which of the partition groups to put it in + * Takes locality into account, but also uses power of 2 choices to load balance + * It strikes a balance between the two use the balanceSlack variable + * @param p partition (ball to be thrown) + * @return partition group (bin to be put in) + */ + def pickBin(p: Partition): PartitionGroup = { + val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded bin of replicas + val prefPart = if (pref == Nil) None else pref.head + + val r1 = rnd.nextInt(groupArr.size) + val r2 = rnd.nextInt(groupArr.size) + val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2) // power of 2 + if (prefPart== None) // if no preferred locations, just use basic power of two + return minPowerOfTwo + + val prefPartActual = prefPart.get + + if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows + return minPowerOfTwo // prefer balance over locality + else { + return prefPartActual // prefer locality over balance + } + } + + def throwBalls() { + if (noLocality) { // no preferredLocations in parent RDD, no randomization needed + if (maxPartitions > groupArr.size) { // just return prev.partitions + for ((p,i) <- prev.partitions.zipWithIndex) { + groupArr(i).list += p + } + } else { // old code, just splits them grouping partitions that are next to each other in the array + (0 until maxPartitions).foreach { i => + val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt + val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt + (rangeStart until rangeEnd).foreach{ j => groupArr(i).list += prev.partitions(j) } + } + } + } else { + for (p <- prev.partitions) { // throw every partition (ball) into a partition group (bin) + pickBin(p).list += p + } + } + } + +} diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 75778de1cc..881bdedfe5 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -173,6 +173,28 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] != null) } + test("cogrouped RDDs with locality") { + // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5 + val data = sc.makeRDD((1 to 9).map( i => (i, (i to (i+2)).map{ j => "m" + (j%6)} ))) + val coalesced1 = data.coalesce(3) + assert(coalesced1.collect().toList.sorted === (1 to 9).toList) // no data lost (NB: order might reshuffle) + + val splits = coalesced1.glom().collect().map(_.toList).toList + assert(splits.length === 3) // ensure it indeed created 3 partitions + + assert(splits.foldLeft(true)( (x,y) => if (!x) false else y.length >= 2) === true) // descent balance (2+ per bin) + + val prefs = List(List("m1","m2","m3"), List("m4","m5","m6")) + val data2 = sc.makeRDD((1 to 100).map( i => (i, prefs(i % 2) ))) // alternate machine prefs + val coalesced2 = data2.coalesce(10) + val splits2 = coalesced2.glom().collect().map(_.toList).toList + + // this gives a list of pairs, each pair is of the form (even,odd), where even is the number of even elements... + val list = splits2.map( ls => ls.foldLeft((0,0))( (x,y) => if (y % 2 == 0) (x._1+1,x._2) else (x._1,x._2+1)) ) + val maxes = list.map( { case (a,b) => if (a>b) a else b } ) // get the maxs, this represents the locality + maxes.foreach( locality => assert( locality > 7) ) // at least 70% locality in each partition + + } test("zipped RDDs") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) |