aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSteven She <steven@canopylabs.com>2015-04-27 18:55:02 -0400
committerSean Owen <sowen@cloudera.com>2015-04-27 18:55:02 -0400
commitb9de9e040aff371c6acf9b3f3d1ff8b360c0cd56 (patch)
tree0f7fa83196d607983f92bfbfc496e7308f240d8e /core/src
parentca9f4ebb8e510e521bf4df0331375ddb385fb9d2 (diff)
downloadspark-b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56.tar.gz
spark-b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56.tar.bz2
spark-b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56.zip
[SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner
Added a check to the SparkContext.union method to check that a partitioner is defined on all RDDs when instantiating a PartitionerAwareUnionRDD. Author: Steven She <steven@canopylabs.com> Closes #5679 from stevencanopy/SPARK-7103 and squashes the following commits: 5a3d846 [Steven She] SPARK-7103: Fix crash with SparkContext.union when at least one RDD has no partitioner
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala21
3 files changed, 23 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 86269eac52..ea4ddcc2e2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1055,7 +1055,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
val partitioners = rdds.flatMap(_.partitioner).toSet
- if (partitioners.size == 1) {
+ if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
new PartitionerAwareUnionRDD(this, rdds)
} else {
new UnionRDD(this, rdds)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 92b0641d0f..7598ff617b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
var rdds: Seq[RDD[T]]
) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
require(rdds.length > 0)
+ require(rdds.forall(_.partitioner.isDefined))
require(rdds.flatMap(_.partitioner).toSet.size == 1,
"Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index df42faab64..ef8c36a286 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
}
+ test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") {
+ val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
+ val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
+ val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner)
+ assert(unionRdd.isInstanceOf[UnionRDD[_]])
+ }
+
+ test("SparkContext.union creates PartitionAwareUnionRDD if all RDDs have partitioners") {
+ val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
+ val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner)
+ assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]])
+ }
+
+ test("PartitionAwareUnionRDD raises exception if at least one RDD has no partitioner") {
+ val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
+ val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
+ intercept[IllegalArgumentException] {
+ new PartitionerAwareUnionRDD(sc, Seq(rddWithNoPartitioner, rddWithPartitioner))
+ }
+ }
+
test("partitioner aware union") {
def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = {
sc.makeRDD(seq, 1)