diff options
author | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-16 11:56:44 -0700 |
---|---|---|
committer | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-20 16:13:37 -0700 |
commit | abcefb3858aac373bd8898c3e998375d5f26b803 (patch) | |
tree | af1cc16c1e3680278cd6fbd0973fff6252937f5f /core | |
parent | 35537e6341dad72366a7ba6d92d6de9c710542ac (diff) | |
download | spark-abcefb3858aac373bd8898c3e998375d5f26b803.tar.gz spark-abcefb3858aac373bd8898c3e998375d5f26b803.tar.bz2 spark-abcefb3858aac373bd8898c3e998375d5f26b803.zip |
fixed matei's comments
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/CoalescedRDD.scala | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 1cfa404fd8..e6fe8ec8ee 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -41,7 +41,7 @@ private[spark] case class CoalescedRDDPartition( * as one of their preferredLocations * @return locality of this coalesced partition between 0 and 1 */ - def localFraction :Double = { + def localFraction: Double = { val loc = parents.count(p => rdd.preferredLocations(p).contains(preferredLocation)) if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) } @@ -83,9 +83,9 @@ class CoalescedRDD[T: ClassManifest]( extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { - val groupList = coalescePartitions(maxPartitions, prev, balanceSlack) + val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack) - groupList.zipWithIndex.map { + pc.run().zipWithIndex.map { case (pg, i) => val ids = pg.list.map(_.index).toArray new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) @@ -126,7 +126,7 @@ class CoalescedRDD[T: ClassManifest]( } -class coalescePartitions protected (maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { +private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { protected def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size protected def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = @@ -303,21 +303,22 @@ class coalescePartitions protected (maxPartitions: Int, prev: RDD[_], balanceSla } } - def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray + protected def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray + /** + * Runs the packing algorithm and returns an array of PartitionGroups that if possible are + * load balanced and grouped by locality + * @return array of partition groups + */ + def run() : Array[PartitionGroup] = { + setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) + throwBalls() // assign partitions (balls) to each group (bins) + getPartitions + } } -case class PartitionGroup(prefLoc: String = "") { +private[spark] case class PartitionGroup(prefLoc: String = "") { var list = mutable.ListBuffer[Partition]() def size = list.size } - -object coalescePartitions { - def apply(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) : Array[PartitionGroup] = { - val pc = new coalescePartitions(maxPartitions, prev, balanceSlack) - pc.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) - pc.throwBalls() // assign partitions (balls) to each group (bins) - pc.getPartitions - } -} |