aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-16 11:56:44 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:13:37 -0700
commitabcefb3858aac373bd8898c3e998375d5f26b803 (patch)
treeaf1cc16c1e3680278cd6fbd0973fff6252937f5f /core
parent35537e6341dad72366a7ba6d92d6de9c710542ac (diff)
downloadspark-abcefb3858aac373bd8898c3e998375d5f26b803.tar.gz
spark-abcefb3858aac373bd8898c3e998375d5f26b803.tar.bz2
spark-abcefb3858aac373bd8898c3e998375d5f26b803.zip
fixed matei's comments
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala31
1 files changed, 16 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 1cfa404fd8..e6fe8ec8ee 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -41,7 +41,7 @@ private[spark] case class CoalescedRDDPartition(
* as one of their preferredLocations
* @return locality of this coalesced partition between 0 and 1
*/
- def localFraction :Double = {
+ def localFraction: Double = {
val loc = parents.count(p => rdd.preferredLocations(p).contains(preferredLocation))
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
@@ -83,9 +83,9 @@ class CoalescedRDD[T: ClassManifest](
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getPartitions: Array[Partition] = {
- val groupList = coalescePartitions(maxPartitions, prev, balanceSlack)
+ val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
- groupList.zipWithIndex.map {
+ pc.run().zipWithIndex.map {
case (pg, i) =>
val ids = pg.list.map(_.index).toArray
new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
@@ -126,7 +126,7 @@ class CoalescedRDD[T: ClassManifest](
}
-class coalescePartitions protected (maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
protected def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
protected def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
@@ -303,21 +303,22 @@ class coalescePartitions protected (maxPartitions: Int, prev: RDD[_], balanceSla
}
}
- def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+ protected def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+ /**
+ * Runs the packing algorithm and returns an array of PartitionGroups that if possible are
+ * load balanced and grouped by locality
+ * @return array of partition groups
+ */
+ def run() : Array[PartitionGroup] = {
+ setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
+ throwBalls() // assign partitions (balls) to each group (bins)
+ getPartitions
+ }
}
-case class PartitionGroup(prefLoc: String = "") {
+private[spark] case class PartitionGroup(prefLoc: String = "") {
var list = mutable.ListBuffer[Partition]()
def size = list.size
}
-
-object coalescePartitions {
- def apply(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) : Array[PartitionGroup] = {
- val pc = new coalescePartitions(maxPartitions, prev, balanceSlack)
- pc.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
- pc.throwBalls() // assign partitions (balls) to each group (bins)
- pc.getPartitions
- }
-}