aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-19 12:09:08 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:16:05 -0700
commita75a64eade3f540e72fc0bc646ba74a689f03f3e (patch)
tree528cb782ec6b9bc34da556cc74c13aa6138da939 /core
parentf1c853d76dce4fbc34f580be0a3ae15cc5be9c80 (diff)
downloadspark-a75a64eade3f540e72fc0bc646ba74a689f03f3e.tar.gz
spark-a75a64eade3f540e72fc0bc646ba74a689f03f3e.tar.bz2
spark-a75a64eade3f540e72fc0bc646ba74a689f03f3e.zip
Fixed almost all of Matei's feedback
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala43
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala14
2 files changed, 26 insertions, 31 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index c475b7a8aa..04bb8089a4 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -79,7 +79,7 @@ class CoalescedRDD[T: ClassManifest](
pc.run().zipWithIndex.map {
case (pg, i) =>
- val ids = pg.list.map(_.index).toArray
+ val ids = pg.arr.map(_.index).toArray
new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
}
@@ -109,26 +109,21 @@ class CoalescedRDD[T: ClassManifest](
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
- if (partition.isInstanceOf[CoalescedRDDPartition]) {
- List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
- } else {
- super.getPreferredLocations(partition)
- }
+ List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
}
-
}
/**
* Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
- * this RDD computes one or more of the parent ones. Will produce exactly `maxPartitions` if the
- * parent had more than this many partitions, or fewer if the parent had fewer.
+ * this RDD computes one or more of the parent ones. It will produce exactly `maxPartitions` if the
+ * parent had more than maxPartitions, or fewer if the parent had fewer.
*
* This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
* or to avoid having a large number of small tasks when processing a directory with many files.
*
* If there is no locality information (no preferredLocations) in the parent, then the coalescing
* is very simple: chunk parents that are close in the Array in chunks.
- * If there is locality information, it proceeds to pack them with the following three goals:
+ * If there is locality information, it proceeds to pack them with the following four goals:
*
* (1) Balance the groups so they roughly have the same number of parent partitions
* (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer
@@ -172,7 +167,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
var noLocality = true // if true if no preferredLocations exists for parent RDD
// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
- def currentPreferredLocations(rdd: RDD[_], part: Partition) : Seq[String] =
+ def currentPreferredLocations(rdd: RDD[_], part: Partition): Seq[String] =
rdd.context.getPreferredLocs(rdd, part.index)
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
@@ -199,9 +194,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val res = List(i1,i2,i3)
res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd)
- // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(0),p) }} ++
- // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(1),p) }} ++
- // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(2),p) }}
+ // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(0),p) }) ++
+ // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(1),p) }) ++
+ // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(2),p) })
}
// hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
@@ -228,9 +223,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
groupHash.get(key).map(_.sortWith(compare).head)
}
- def addPartToPGroup(part : Partition, pgroup : PartitionGroup) : Boolean = {
+ def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
if (!initialHash.contains(part)) {
- pgroup.list += part // already assign this element
+ pgroup.arr += part // already assign this element
initialHash += part // needed to avoid assigning partitions to multiple buckets
true
} else { false }
@@ -319,30 +314,30 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
for ((p,i) <- prev.partitions.zipWithIndex) {
- groupArr(i).list += p
+ groupArr(i).arr += p
}
} else { // no locality available, then simply split partitions based on positions in array
- (0 until maxPartitions).foreach { i =>
+ for(i <- 0 until maxPartitions) {
val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
- (rangeStart until rangeEnd).foreach{ j => groupArr(i).list += prev.partitions(j) }
+ (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
}
}
} else {
for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
- pickBin(p).list += p
+ pickBin(p).arr += p
}
}
}
- def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+ def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
/**
* Runs the packing algorithm and returns an array of PartitionGroups that if possible are
* load balanced and grouped by locality
* @return array of partition groups
*/
- def run() : Array[PartitionGroup] = {
+ def run(): Array[PartitionGroup] = {
setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
throwBalls() // assign partitions (balls) to each group (bins)
getPartitions
@@ -350,7 +345,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
}
private[spark] case class PartitionGroup(prefLoc: String = "") {
- var list = mutable.ListBuffer[Partition]()
+ var arr = mutable.ArrayBuffer[Partition]()
- def size = list.size
+ def size = arr.size
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 14f7c62782..64e2c0605b 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -181,7 +181,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
val splits = coalesced1.glom().collect().map(_.toList).toList
- assert(splits.length == 3, "Supposed to coalesce to 3 but got " + splits.length)
+ assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length)
assert(splits.foldLeft(true)
((x,y) => if (!x) false else y.length >= 1) === true, "Some partitions were empty")
@@ -189,10 +189,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
val coalesced4 = data.coalesce(20)
- assert(coalesced4.glom().collect().map(_.toList).toList.sortWith(
- (x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9).
- map(x => List(x)).toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
-
+ val listOfLists = coalesced4.glom().collect().map(_.toList).toList
+ val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
+ assert( sortedList === (1 to 9).
+ map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
}
test("cogrouped RDDs with locality, large scale (10K partitions)") {
@@ -204,8 +204,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val machines = mutable.ListBuffer[String]()
(1 to numMachines).foreach(machines += "m"+_)
- val blocks = (1 to partitions).map(i => (i, (i to (i+2))
- .map{ j => machines(rnd.nextInt(machines.size))}))
+ val blocks = (1 to partitions).map(i =>
+ { (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } )
val data2 = sc.makeRDD(blocks)
val coalesced2 = data2.coalesce(numMachines*2)