aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala17
1 files changed, 17 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index a663dab772..979fb426c9 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -116,6 +116,23 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
}
+ test("SparkContext.union parallel partition listing") {
+ val nums1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val nums2 = sc.makeRDD(Array(5, 6, 7, 8), 2)
+ val serialUnion = sc.union(nums1, nums2)
+ val expected = serialUnion.collect().toList
+
+ assert(serialUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === false)
+
+ sc.conf.set("spark.rdd.parallelListingThreshold", "1")
+ val parallelUnion = sc.union(nums1, nums2)
+ val actual = parallelUnion.collect().toList
+ sc.conf.remove("spark.rdd.parallelListingThreshold")
+
+ assert(parallelUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === true)
+ assert(expected === actual)
+ }
+
test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") {
val rddWithPartitioner = sc.parallelize(Seq(1 -> true)).partitionBy(new HashPartitioner(1))
val rddWithNoPartitioner = sc.parallelize(Seq(2 -> true))