aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-19 13:13:24 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:16:05 -0700
commit9192c358e40f2b3954d9939d7e153e3dd4d4ba75 (patch)
treeeaa0cc7dc6d77fc8b5d991004427ecc34bba3ba0 /core
parenta75a64eade3f540e72fc0bc646ba74a689f03f3e (diff)
downloadspark-9192c358e40f2b3954d9939d7e153e3dd4d4ba75.tar.gz
spark-9192c358e40f2b3954d9939d7e153e3dd4d4ba75.tar.bz2
spark-9192c358e40f2b3954d9939d7e153e3dd4d4ba75.zip
simpler code
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala23
1 files changed, 7 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 04bb8089a4..25c48caad3 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -167,7 +167,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
var noLocality = true // if true if no preferredLocations exists for parent RDD
// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
- def currentPreferredLocations(rdd: RDD[_], part: Partition): Seq[String] =
+ def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] =
rdd.context.getPreferredLocs(rdd, part.index)
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
@@ -182,21 +182,12 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
// initializes/resets to start iterating from the beginning
def resetIterator() = {
- val i1 = prev.partitions.view.map{ p: Partition =>
- { if (currentPreferredLocations(prev, p).length > 0)
- Some((currentPreferredLocations(prev, p)(0),p)) else None } }
- val i2 = prev.partitions.view.map{ p: Partition =>
- { if (currentPreferredLocations(prev, p).length > 1)
- Some((currentPreferredLocations(prev, p)(1),p)) else None } }
- val i3 = prev.partitions.view.map{ p: Partition =>
- { if (currentPreferredLocations(prev, p).length > 2)
- Some((currentPreferredLocations(prev, p)(2),p)) else None } }
- val res = List(i1,i2,i3)
- res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd)
-
- // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(0),p) }) ++
- // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(1),p) }) ++
- // prev.partitions.iterator.map(p: Partition => { (currentPreferredLocations(prev, p)(2),p) })
+ val iterators = (0 to 2).map( x =>
+ prev.partitions.iterator.flatMap(p => {
+ if (currPrefLocs(prev, p).size > x) Some((currPrefLocs(prev, p)(x), p)) else None
+ } )
+ )
+ iterators.reduceLeft((x, y) => x ++ y)
}
// hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD