From a75a64eade3f540e72fc0bc646ba74a689f03f3e Mon Sep 17 00:00:00 2001 From: Ali Ghodsi Date: Mon, 19 Aug 2013 12:09:08 -0700 Subject: Fixed almost all of Matei's feedback --- core/src/main/scala/spark/rdd/CoalescedRDD.scala | 43 +++++++++++------------- core/src/test/scala/spark/RDDSuite.scala | 14 ++++---- 2 files changed, 26 insertions(+), 31 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index c475b7a8aa..04bb8089a4 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -79,7 +79,7 @@ class CoalescedRDD[T: ClassManifest]( pc.run().zipWithIndex.map { case (pg, i) => - val ids = pg.list.map(_.index).toArray + val ids = pg.arr.map(_.index).toArray new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) } } @@ -109,26 +109,21 @@ class CoalescedRDD[T: ClassManifest]( * @return the machine most preferred by split */ override def getPreferredLocations(partition: Partition): Seq[String] = { - if (partition.isInstanceOf[CoalescedRDDPartition]) { - List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation) - } else { - super.getPreferredLocations(partition) - } + List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation) } - } /** * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of - * this RDD computes one or more of the parent ones. Will produce exactly `maxPartitions` if the - * parent had more than this many partitions, or fewer if the parent had fewer. + * this RDD computes one or more of the parent ones. It will produce exactly `maxPartitions` if the + * parent had more than maxPartitions, or fewer if the parent had fewer. * * This transformation is useful when an RDD with many partitions gets filtered into a smaller one, * or to avoid having a large number of small tasks when processing a directory with many files. * * If there is no locality information (no preferredLocations) in the parent, then the coalescing * is very simple: chunk parents that are close in the Array in chunks. - * If there is locality information, it proceeds to pack them with the following three goals: + * If there is locality information, it proceeds to pack them with the following four goals: * * (1) Balance the groups so they roughly have the same number of parent partitions * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer @@ -172,7 +167,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc var noLocality = true // if true if no preferredLocations exists for parent RDD // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) - def currentPreferredLocations(rdd: RDD[_], part: Partition) : Seq[String] = + def currentPreferredLocations(rdd: RDD[_], part: Partition): Seq[String] = rdd.context.getPreferredLocs(rdd, part.index) // this class just keeps iterating and rotating infinitely over the partitions of the RDD @@ -199,9 +194,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc val res = List(i1,i2,i3) res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) - // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(0),p) }} ++ - // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(1),p) }} ++ - // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(2),p) }} + // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(0),p) }) ++ + // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(1),p) }) ++ + // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(2),p) }) } // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD @@ -228,9 +223,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc groupHash.get(key).map(_.sortWith(compare).head) } - def addPartToPGroup(part : Partition, pgroup : PartitionGroup) : Boolean = { + def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = { if (!initialHash.contains(part)) { - pgroup.list += part // already assign this element + pgroup.arr += part // already assign this element initialHash += part // needed to avoid assigning partitions to multiple buckets true } else { false } @@ -319,30 +314,30 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc if (noLocality) { // no preferredLocations in parent RDD, no randomization needed if (maxPartitions > groupArr.size) { // just return prev.partitions for ((p,i) <- prev.partitions.zipWithIndex) { - groupArr(i).list += p + groupArr(i).arr += p } } else { // no locality available, then simply split partitions based on positions in array - (0 until maxPartitions).foreach { i => + for(i <- 0 until maxPartitions) { val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt - (rangeStart until rangeEnd).foreach{ j => groupArr(i).list += prev.partitions(j) } + (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) } } } } else { for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group - pickBin(p).list += p + pickBin(p).arr += p } } } - def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray + def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray /** * Runs the packing algorithm and returns an array of PartitionGroups that if possible are * load balanced and grouped by locality * @return array of partition groups */ - def run() : Array[PartitionGroup] = { + def run(): Array[PartitionGroup] = { setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) throwBalls() // assign partitions (balls) to each group (bins) getPartitions @@ -350,7 +345,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc } private[spark] case class PartitionGroup(prefLoc: String = "") { - var list = mutable.ListBuffer[Partition]() + var arr = mutable.ArrayBuffer[Partition]() - def size = list.size + def size = arr.size } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 14f7c62782..64e2c0605b 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -181,7 +181,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing") val splits = coalesced1.glom().collect().map(_.toList).toList - assert(splits.length == 3, "Supposed to coalesce to 3 but got " + splits.length) + assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length) assert(splits.foldLeft(true) ((x,y) => if (!x) false else y.length >= 1) === true, "Some partitions were empty") @@ -189,10 +189,10 @@ class RDDSuite extends FunSuite with SharedSparkContext { // If we try to coalesce into more partitions than the original RDD, it should just // keep the original number of partitions. val coalesced4 = data.coalesce(20) - assert(coalesced4.glom().collect().map(_.toList).toList.sortWith( - (x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9). - map(x => List(x)).toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back") - + val listOfLists = coalesced4.glom().collect().map(_.toList).toList + val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) } + assert( sortedList === (1 to 9). + map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back") } test("cogrouped RDDs with locality, large scale (10K partitions)") { @@ -204,8 +204,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val machines = mutable.ListBuffer[String]() (1 to numMachines).foreach(machines += "m"+_) - val blocks = (1 to partitions).map(i => (i, (i to (i+2)) - .map{ j => machines(rnd.nextInt(machines.size))})) + val blocks = (1 to partitions).map(i => + { (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } ) val data2 = sc.makeRDD(blocks) val coalesced2 = data2.coalesce(numMachines*2) -- cgit v1.2.3