aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala23
1 files changed, 7 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 04bb8089a4..25c48caad3 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -167,7 +167,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
var noLocality = true // if true if no preferredLocations exists for parent RDD
// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
- def currentPreferredLocations(rdd: RDD[_], part: Partition): Seq[String] =
+ def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] =
rdd.context.getPreferredLocs(rdd, part.index)
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
@@ -182,21 +182,12 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
// initializes/resets to start iterating from the beginning
def resetIterator() = {
- val i1 = prev.partitions.view.map{ p: Partition =>
- { if (currentPreferredLocations(prev, p).length > 0)
- Some((currentPreferredLocations(prev, p)(0),p)) else None } }
- val i2 = prev.partitions.view.map{ p: Partition =>
- { if (currentPreferredLocations(prev, p).length > 1)
- Some((currentPreferredLocations(prev, p)(1),p)) else None } }
- val i3 = prev.partitions.view.map{ p: Partition =>
- { if (currentPreferredLocations(prev, p).length > 2)
- Some((currentPreferredLocations(prev, p)(2),p)) else None } }
- val res = List(i1,i2,i3)
- res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd)
-
- // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(0),p) }) ++
- // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(1),p) }) ++
- // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(2),p) })
+ val iterators = (0 to 2).map( x =>
+ prev.partitions.iterator.flatMap(p => {
+ if (currPrefLocs(prev, p).size > x) Some((currPrefLocs(prev, p)(x), p)) else None
+ } )
+ )
+ iterators.reduceLeft((x, y) => x ++ y)
}
// hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD