aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-07-17 01:01:14 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-17 01:01:14 -0700
commit7c23c0dc3ed721c95690fc49f435d9de6952523c (patch)
tree24916b22933995a60f0f14b4ebf58ca28fa0ad2f /core/src/test
parent9c249743eaabe5fc8d961c7aa581cc0197f6e950 (diff)
downloadspark-7c23c0dc3ed721c95690fc49f435d9de6952523c.tar.gz
spark-7c23c0dc3ed721c95690fc49f435d9de6952523c.tar.bz2
spark-7c23c0dc3ed721c95690fc49f435d9de6952523c.zip
[SPARK-2412] CoalescedRDD throws exception with certain pref locs
If the first pass of CoalescedRDD does not find the target number of locations AND the second pass finds new locations, an exception is thrown, as "groupHash.get(nxt_replica).get" is not valid. The fix is just to add an ArrayBuffer to groupHash for that replica if it didn't already exist. Author: Aaron Davidson <aaron@databricks.com> Closes #1337 from aarondav/2412 and squashes the following commits: f587b5d [Aaron Davidson] getOrElseUpdate 3ad8a3c [Aaron Davidson] [SPARK-2412] CoalescedRDD throws exception with certain pref locs
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala14
1 files changed, 14 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6ea045198e..2924de1129 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -351,6 +351,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
+ // Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception
+ test("coalesced RDDs with locality, fail first pass") {
+ val initialPartitions = 1000
+ val targetLen = 50
+ val couponCount = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt // = 492
+
+ val blocks = (1 to initialPartitions).map { i =>
+ (i, List(if (i > couponCount) "m2" else "m1"))
+ }
+ val data = sc.makeRDD(blocks)
+ val coalesced = data.coalesce(targetLen)
+ assert(coalesced.partitions.length == targetLen)
+ }
+
test("zipped RDDs") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val zipped = nums.zip(nums.map(_ + 1.0))