aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-07-17 01:01:14 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-17 01:01:14 -0700
commit7c23c0dc3ed721c95690fc49f435d9de6952523c (patch)
tree24916b22933995a60f0f14b4ebf58ca28fa0ad2f /core
parent9c249743eaabe5fc8d961c7aa581cc0197f6e950 (diff)
downloadspark-7c23c0dc3ed721c95690fc49f435d9de6952523c.tar.gz
spark-7c23c0dc3ed721c95690fc49f435d9de6952523c.tar.bz2
spark-7c23c0dc3ed721c95690fc49f435d9de6952523c.zip
[SPARK-2412] CoalescedRDD throws exception with certain pref locs
If the first pass of CoalescedRDD does not find the target number of locations AND the second pass finds new locations, an exception is thrown, as "groupHash.get(nxt_replica).get" is not valid. The fix is just to add an ArrayBuffer to groupHash for that replica if it didn't already exist. Author: Aaron Davidson <aaron@databricks.com> Closes #1337 from aarondav/2412 and squashes the following commits: f587b5d [Aaron Davidson] getOrElseUpdate 3ad8a3c [Aaron Davidson] [SPARK-2412] CoalescedRDD throws exception with certain pref locs
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala14
2 files changed, 16 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c45b759f00..e7221e3032 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -258,7 +258,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
addPartToPGroup(nxt_part, pgroup)
- groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
+ groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple
numCreated += 1
}
}
@@ -267,7 +267,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
var (nxt_replica, nxt_part) = rotIt.next()
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
- groupHash.get(nxt_replica).get += pgroup
+ groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
var tries = 0
while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
nxt_part = rotIt.next()._2
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6ea045198e..2924de1129 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -351,6 +351,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
+ // Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception
+ test("coalesced RDDs with locality, fail first pass") {
+ val initialPartitions = 1000
+ val targetLen = 50
+ val couponCount = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt // = 492
+
+ val blocks = (1 to initialPartitions).map { i =>
+ (i, List(if (i > couponCount) "m2" else "m1"))
+ }
+ val data = sc.makeRDD(blocks)
+ val coalesced = data.coalesce(targetLen)
+ assert(coalesced.partitions.length == targetLen)
+ }
+
test("zipped RDDs") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val zipped = nums.zip(nums.map(_ + 1.0))