diff options
author | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-15 12:52:13 -0700 |
---|---|---|
committer | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-20 16:13:37 -0700 |
commit | 35537e6341dad72366a7ba6d92d6de9c710542ac (patch) | |
tree | 754292d3b2011e001af3ee011f455cd345bfa4ad | |
parent | 339598c0806d01e3d43cd49dd02e9d510b5f586b (diff) | |
download | spark-35537e6341dad72366a7ba6d92d6de9c710542ac.tar.gz spark-35537e6341dad72366a7ba6d92d6de9c710542ac.tar.bz2 spark-35537e6341dad72366a7ba6d92d6de9c710542ac.zip |
Made a function object that returns the coalesced groups
-rw-r--r-- | core/src/main/scala/spark/rdd/CoalescedRDD.scala | 65 |
1 files changed, 35 insertions, 30 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 8d06b4ceb8..1cfa404fd8 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -83,9 +83,9 @@ class CoalescedRDD[T: ClassManifest]( extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { - val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack) + val groupList = coalescePartitions(maxPartitions, prev, balanceSlack) - packer.getPartitions.zipWithIndex.map { + groupList.zipWithIndex.map { case (pg, i) => val ids = pg.list.map(_.index).toArray new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) @@ -126,46 +126,40 @@ class CoalescedRDD[T: ClassManifest]( } +class coalescePartitions protected (maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { -class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { - - private def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size - private def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = + protected def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size + protected def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get) - private val rnd = new scala.util.Random(7919) // keep this class deterministic + protected val rnd = new scala.util.Random(7919) // keep this class deterministic // each element of groupArr represents one coalesced partition - private val groupArr = mutable.ArrayBuffer[PartitionGroup]() + protected val groupArr = mutable.ArrayBuffer[PartitionGroup]() // hash used to check whether some machine is already in groupArr - private val groupHash = mutable.Map[String, mutable.ListBuffer[PartitionGroup]]() + protected val groupHash = mutable.Map[String, mutable.ListBuffer[PartitionGroup]]() // hash used for the first maxPartitions (to avoid duplicates) - private val initialHash = mutable.Map[Partition, Boolean]() + protected val initialHash = mutable.Map[Partition, Boolean]() // determines the tradeoff between load-balancing the partitions sizes and their locality // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality - private val slack = (balanceSlack * prev.partitions.size).toInt - - private var noLocality = true // if true if no preferredLocations exists for parent RDD + protected val slack = (balanceSlack * prev.partitions.size).toInt - this.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) - this.throwBalls() // assign partitions (balls) to each group (bins) - - def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray + protected var noLocality = true // if true if no preferredLocations exists for parent RDD // this class just keeps iterating and rotating infinitely over the partitions of the RDD // next() returns the next preferred machine that a partition is replicated on // the rotator first goes through the first replica copy of each partition, then second, third // the iterators return type is a tuple: (replicaString, partition) - private class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] { + protected class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] { - private var it: Iterator[(String, Partition)] = resetIterator() + protected var it: Iterator[(String, Partition)] = resetIterator() override val isEmpty = !it.hasNext // initializes/resets to start iterating from the beginning - private def resetIterator() = { + protected def resetIterator() = { val i1 = prev.partitions.view.map{ p: Partition => { if (prev.preferredLocations(p).length > 0) Some((prev.preferredLocations(p)(0),p)) else None } } @@ -193,19 +187,13 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) } } - case class PartitionGroup(prefLoc: String = "") { - var list = mutable.ListBuffer[Partition]() - - def size = list.size - } - /** * Sorts and gets the least element of the list associated with key in groupHash * The returned PartitionGroup is the least loaded of all groups that represent the machine "key" * @param key string representing a partitioned group on preferred machine key * @return Option of PartitionGroup that has least elements for key */ - private def getLeastGroupHash(key: String): Option[PartitionGroup] = { + protected def getLeastGroupHash(key: String): Option[PartitionGroup] = { groupHash.get(key).map(_.sortWith(compare).head) } @@ -223,7 +211,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) * until it has seen most of the preferred locations (2 * n log(n)) * @param targetLen */ - private def setupGroups(targetLen: Int) { + protected def setupGroups(targetLen: Int) { val rotIt = new RotateLocations(prev) // deal with empty case, just create targetLen partition groups with no preferred location @@ -276,7 +264,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) * @param p partition (ball to be thrown) * @return partition group (bin to be put in) */ - private def pickBin(p: Partition): PartitionGroup = { + protected def pickBin(p: Partition): PartitionGroup = { val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least load val prefPart = if (pref == Nil) None else pref.head @@ -295,7 +283,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) } } - private def throwBalls() { + protected def throwBalls() { if (noLocality) { // no preferredLocations in parent RDD, no randomization needed if (maxPartitions > groupArr.size) { // just return prev.partitions for ((p,i) <- prev.partitions.zipWithIndex) { @@ -315,4 +303,21 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) } } + def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray + +} + +case class PartitionGroup(prefLoc: String = "") { + var list = mutable.ListBuffer[Partition]() + + def size = list.size +} + +object coalescePartitions { + def apply(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) : Array[PartitionGroup] = { + val pc = new coalescePartitions(maxPartitions, prev, balanceSlack) + pc.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) + pc.throwBalls() // assign partitions (balls) to each group (bins) + pc.getPartitions + } } |