aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorFrançois Garillot <francois@garillot.net>2015-07-24 15:41:13 +0100
committerSean Owen <sowen@cloudera.com>2015-07-24 15:41:13 +0100
commit6cd28cc21ed585ab8d1e0e7147a1a48b044c9c8e (patch)
tree6c81549eff46101fe226b0625729102b2772cf90 /core
parent6a7e537f3a4fd5e99a905f9842dc0ad4c348e4fd (diff)
downloadspark-6cd28cc21ed585ab8d1e0e7147a1a48b044c9c8e.tar.gz
spark-6cd28cc21ed585ab8d1e0e7147a1a48b044c9c8e.tar.bz2
spark-6cd28cc21ed585ab8d1e0e7147a1a48b044c9c8e.zip
[SPARK-9236] [CORE] Make defaultPartitioner not reuse a parent RDD's partitioner if it has 0 partitions
See also comments on https://issues.apache.org/jira/browse/SPARK-9236 Author: François Garillot <francois@garillot.net> Closes #7616 from huitseeker/issue/SPARK-9236 and squashes the following commits: 217f902 [François Garillot] [SPARK-9236] Make defaultPartitioner not reuse a parent RDD's partitioner if it has 0 partitions
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala23
2 files changed, 24 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index ad68512dcc..4b9d59975b 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -56,7 +56,7 @@ object Partitioner {
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
- for (r <- bySize if r.partitioner.isDefined) {
+ for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index dfa102f432..1321ec8473 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -282,6 +282,29 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
))
}
+ // See SPARK-9326
+ test("cogroup with empty RDD") {
+ import scala.reflect.classTag
+ val intPairCT = classTag[(Int, Int)]
+
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.emptyRDD[(Int, Int)](intPairCT)
+
+ val joined = rdd1.cogroup(rdd2).collect()
+ assert(joined.size > 0)
+ }
+
+ // See SPARK-9326
+ test("cogroup with groupByed RDD having 0 partitions") {
+ import scala.reflect.classTag
+ val intCT = classTag[Int]
+
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.emptyRDD[Int](intCT).groupBy((x) => 5)
+ val joined = rdd1.cogroup(rdd2).collect()
+ assert(joined.size > 0)
+ }
+
test("rightOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))