From 9192c358e40f2b3954d9939d7e153e3dd4d4ba75 Mon Sep 17 00:00:00 2001 From: Ali Ghodsi Date: Mon, 19 Aug 2013 13:13:24 -0700 Subject: simpler code --- core/src/main/scala/spark/rdd/CoalescedRDD.scala | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 04bb8089a4..25c48caad3 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -167,7 +167,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc var noLocality = true // if true if no preferredLocations exists for parent RDD // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) - def currentPreferredLocations(rdd: RDD[_], part: Partition): Seq[String] = + def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = rdd.context.getPreferredLocs(rdd, part.index) // this class just keeps iterating and rotating infinitely over the partitions of the RDD @@ -182,21 +182,12 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc // initializes/resets to start iterating from the beginning def resetIterator() = { - val i1 = prev.partitions.view.map{ p: Partition => - { if (currentPreferredLocations(prev, p).length > 0) - Some((currentPreferredLocations(prev, p)(0),p)) else None } } - val i2 = prev.partitions.view.map{ p: Partition => - { if (currentPreferredLocations(prev, p).length > 1) - Some((currentPreferredLocations(prev, p)(1),p)) else None } } - val i3 = prev.partitions.view.map{ p: Partition => - { if (currentPreferredLocations(prev, p).length > 2) - Some((currentPreferredLocations(prev, p)(2),p)) else None } } - val res = List(i1,i2,i3) - res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) - - // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(0),p) }) ++ - // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(1),p) }) ++ - // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(2),p) }) + val iterators = (0 to 2).map( x => + prev.partitions.iterator.flatMap(p => { + if (currPrefLocs(prev, p).size > x) Some((currPrefLocs(prev, p)(x), p)) else None + } ) + ) + iterators.reduceLeft((x, y) => x ++ y) } // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD -- cgit v1.2.3