aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala61
1 files changed, 33 insertions, 28 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 25973348a7..1901330d8b 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -274,37 +274,42 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("coalesced RDDs with locality, large scale (10K partitions)") {
// large scale experiment
import collection.mutable
- val rnd = scala.util.Random
val partitions = 10000
val numMachines = 50
val machines = mutable.ListBuffer[String]()
- (1 to numMachines).foreach(machines += "m"+_)
-
- val blocks = (1 to partitions).map(i =>
- { (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } )
-
- val data2 = sc.makeRDD(blocks)
- val coalesced2 = data2.coalesce(numMachines*2)
-
- // test that you get over 90% locality in each group
- val minLocality = coalesced2.partitions
- .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
- assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%")
-
- // test that the groups are load balanced with 100 +/- 20 elements in each
- val maxImbalance = coalesced2.partitions
- .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
- .foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
- assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
-
- val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs
- val coalesced3 = data3.coalesce(numMachines*2)
- val minLocality2 = coalesced3.partitions
- .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
- assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
- (minLocality2*100.0).toInt + "%")
+ (1 to numMachines).foreach(machines += "m" + _)
+ val rnd = scala.util.Random
+ for (seed <- 1 to 5) {
+ rnd.setSeed(seed)
+
+ val blocks = (1 to partitions).map { i =>
+ (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList)
+ }
+
+ val data2 = sc.makeRDD(blocks)
+ val coalesced2 = data2.coalesce(numMachines * 2)
+
+ // test that you get over 90% locality in each group
+ val minLocality = coalesced2.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
+ .foldLeft(1.0)((perc, loc) => math.min(perc, loc))
+ assert(minLocality >= 0.90, "Expected 90% locality but got " +
+ (minLocality * 100.0).toInt + "%")
+
+ // test that the groups are load balanced with 100 +/- 20 elements in each
+ val maxImbalance = coalesced2.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
+ .foldLeft(0)((dev, curr) => math.max(math.abs(100 - curr), dev))
+ assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
+
+ val data3 = sc.makeRDD(blocks).map(i => i * 2) // derived RDD to test *current* pref locs
+ val coalesced3 = data3.coalesce(numMachines * 2)
+ val minLocality2 = coalesced3.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
+ .foldLeft(1.0)((perc, loc) => math.min(perc, loc))
+ assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
+ (minLocality2 * 100.0).toInt + "%")
+ }
}
test("zipped RDDs") {