aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-15 12:52:13 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:13:37 -0700
commit35537e6341dad72366a7ba6d92d6de9c710542ac (patch)
tree754292d3b2011e001af3ee011f455cd345bfa4ad /core
parent339598c0806d01e3d43cd49dd02e9d510b5f586b (diff)
downloadspark-35537e6341dad72366a7ba6d92d6de9c710542ac.tar.gz
spark-35537e6341dad72366a7ba6d92d6de9c710542ac.tar.bz2
spark-35537e6341dad72366a7ba6d92d6de9c710542ac.zip
Made a function object that returns the coalesced groups
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala65
1 files changed, 35 insertions, 30 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 8d06b4ceb8..1cfa404fd8 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -83,9 +83,9 @@ class CoalescedRDD[T: ClassManifest](
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getPartitions: Array[Partition] = {
- val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
+ val groupList = coalescePartitions(maxPartitions, prev, balanceSlack)
- packer.getPartitions.zipWithIndex.map {
+ groupList.zipWithIndex.map {
case (pg, i) =>
val ids = pg.list.map(_.index).toArray
new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
@@ -126,46 +126,40 @@ class CoalescedRDD[T: ClassManifest](
}
+class coalescePartitions protected (maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
-class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
-
- private def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
- private def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
+ protected def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
+ protected def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
- private val rnd = new scala.util.Random(7919) // keep this class deterministic
+ protected val rnd = new scala.util.Random(7919) // keep this class deterministic
// each element of groupArr represents one coalesced partition
- private val groupArr = mutable.ArrayBuffer[PartitionGroup]()
+ protected val groupArr = mutable.ArrayBuffer[PartitionGroup]()
// hash used to check whether some machine is already in groupArr
- private val groupHash = mutable.Map[String, mutable.ListBuffer[PartitionGroup]]()
+ protected val groupHash = mutable.Map[String, mutable.ListBuffer[PartitionGroup]]()
// hash used for the first maxPartitions (to avoid duplicates)
- private val initialHash = mutable.Map[Partition, Boolean]()
+ protected val initialHash = mutable.Map[Partition, Boolean]()
// determines the tradeoff between load-balancing the partitions sizes and their locality
// e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
- private val slack = (balanceSlack * prev.partitions.size).toInt
-
- private var noLocality = true // if true if no preferredLocations exists for parent RDD
+ protected val slack = (balanceSlack * prev.partitions.size).toInt
- this.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
- this.throwBalls() // assign partitions (balls) to each group (bins)
-
- def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+ protected var noLocality = true // if true if no preferredLocations exists for parent RDD
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
// the rotator first goes through the first replica copy of each partition, then second, third
// the iterators return type is a tuple: (replicaString, partition)
- private class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] {
+ protected class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] {
- private var it: Iterator[(String, Partition)] = resetIterator()
+ protected var it: Iterator[(String, Partition)] = resetIterator()
override val isEmpty = !it.hasNext
// initializes/resets to start iterating from the beginning
- private def resetIterator() = {
+ protected def resetIterator() = {
val i1 = prev.partitions.view.map{ p: Partition =>
{ if (prev.preferredLocations(p).length > 0)
Some((prev.preferredLocations(p)(0),p)) else None } }
@@ -193,19 +187,13 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
}
- case class PartitionGroup(prefLoc: String = "") {
- var list = mutable.ListBuffer[Partition]()
-
- def size = list.size
- }
-
/**
* Sorts and gets the least element of the list associated with key in groupHash
* The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
* @param key string representing a partitioned group on preferred machine key
* @return Option of PartitionGroup that has least elements for key
*/
- private def getLeastGroupHash(key: String): Option[PartitionGroup] = {
+ protected def getLeastGroupHash(key: String): Option[PartitionGroup] = {
groupHash.get(key).map(_.sortWith(compare).head)
}
@@ -223,7 +211,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
* until it has seen most of the preferred locations (2 * n log(n))
* @param targetLen
*/
- private def setupGroups(targetLen: Int) {
+ protected def setupGroups(targetLen: Int) {
val rotIt = new RotateLocations(prev)
// deal with empty case, just create targetLen partition groups with no preferred location
@@ -276,7 +264,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
* @param p partition (ball to be thrown)
* @return partition group (bin to be put in)
*/
- private def pickBin(p: Partition): PartitionGroup = {
+ protected def pickBin(p: Partition): PartitionGroup = {
val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least load
val prefPart = if (pref == Nil) None else pref.head
@@ -295,7 +283,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
}
- private def throwBalls() {
+ protected def throwBalls() {
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
for ((p,i) <- prev.partitions.zipWithIndex) {
@@ -315,4 +303,21 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
}
+ def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+
+}
+
+case class PartitionGroup(prefLoc: String = "") {
+ var list = mutable.ListBuffer[Partition]()
+
+ def size = list.size
+}
+
+object coalescePartitions {
+ def apply(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) : Array[PartitionGroup] = {
+ val pc = new coalescePartitions(maxPartitions, prev, balanceSlack)
+ pc.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
+ pc.throwBalls() // assign partitions (balls) to each group (bins)
+ pc.getPartitions
+ }
}