aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-19 18:52:43 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:16:05 -0700
commit7b123b3126d555237f31a0787411c4bbc1abd39a (patch)
treefccc0d02a5bda653d66b543ebdbd561191ebf920 /core
parent9192c358e40f2b3954d9939d7e153e3dd4d4ba75 (diff)
downloadspark-7b123b3126d555237f31a0787411c4bbc1abd39a.tar.gz
spark-7b123b3126d555237f31a0787411c4bbc1abd39a.tar.bz2
spark-7b123b3126d555237f31a0787411c4bbc1abd39a.zip
Simpler code
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala6
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala3
2 files changed, 4 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 25c48caad3..b83d443de3 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -167,8 +167,8 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
var noLocality = true // if true if no preferredLocations exists for parent RDD
// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
- def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] =
- rdd.context.getPreferredLocs(rdd, part.index)
+ def currPrefLocs(part: Partition): Seq[String] =
+ prev.context.getPreferredLocs(prev, part.index)
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
@@ -184,7 +184,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
def resetIterator() = {
val iterators = (0 to 2).map( x =>
prev.partitions.iterator.flatMap(p => {
- if (currPrefLocs(prev, p).size > x) Some((currPrefLocs(prev, p)(x), p)) else None
+ if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
} )
)
iterators.reduceLeft((x, y) => x ++ y)
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 64e2c0605b..9e73703371 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -183,8 +183,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val splits = coalesced1.glom().collect().map(_.toList).toList
assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length)
- assert(splits.foldLeft(true)
- ((x,y) => if (!x) false else y.length >= 1) === true, "Some partitions were empty")
+ assert(splits.forall(_.length >= 1) === true, "Some partitions were empty")
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.