diff options
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/rdd/CoalescedRDD.scala | 6 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 3 |
2 files changed, 4 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 25c48caad3..b83d443de3 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -167,8 +167,8 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc var noLocality = true // if true if no preferredLocations exists for parent RDD // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) - def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = - rdd.context.getPreferredLocs(rdd, part.index) + def currPrefLocs(part: Partition): Seq[String] = + prev.context.getPreferredLocs(prev, part.index) // this class just keeps iterating and rotating infinitely over the partitions of the RDD // next() returns the next preferred machine that a partition is replicated on @@ -184,7 +184,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc def resetIterator() = { val iterators = (0 to 2).map( x => prev.partitions.iterator.flatMap(p => { - if (currPrefLocs(prev, p).size > x) Some((currPrefLocs(prev, p)(x), p)) else None + if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None } ) ) iterators.reduceLeft((x, y) => x ++ y) diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 64e2c0605b..9e73703371 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -183,8 +183,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { val splits = coalesced1.glom().collect().map(_.toList).toList assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length) - assert(splits.foldLeft(true) - ((x,y) => if (!x) false else y.length >= 1) === true, "Some partitions were empty") + assert(splits.forall(_.length >= 1) === true, "Some partitions were empty") // If we try to coalesce into more partitions than the original RDD, it should just // keep the original number of partitions. |