diff options
author | François Garillot <francois@garillot.net> | 2015-07-24 15:41:13 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-07-24 15:41:13 +0100 |
commit | 6cd28cc21ed585ab8d1e0e7147a1a48b044c9c8e (patch) | |
tree | 6c81549eff46101fe226b0625729102b2772cf90 /core/src | |
parent | 6a7e537f3a4fd5e99a905f9842dc0ad4c348e4fd (diff) | |
download | spark-6cd28cc21ed585ab8d1e0e7147a1a48b044c9c8e.tar.gz spark-6cd28cc21ed585ab8d1e0e7147a1a48b044c9c8e.tar.bz2 spark-6cd28cc21ed585ab8d1e0e7147a1a48b044c9c8e.zip |
[SPARK-9236] [CORE] Make defaultPartitioner not reuse a parent RDD's partitioner if it has 0 partitions
See also comments on https://issues.apache.org/jira/browse/SPARK-9236
Author: François Garillot <francois@garillot.net>
Closes #7616 from huitseeker/issue/SPARK-9236 and squashes the following commits:
217f902 [François Garillot] [SPARK-9236] Make defaultPartitioner not reuse a parent RDD's partitioner if it has 0 partitions
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/Partitioner.scala | 2 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 23 |
2 files changed, 24 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index ad68512dcc..4b9d59975b 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -56,7 +56,7 @@ object Partitioner { */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse - for (r <- bySize if r.partitioner.isDefined) { + for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index dfa102f432..1321ec8473 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -282,6 +282,29 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { )) } + // See SPARK-9326 + test("cogroup with empty RDD") { + import scala.reflect.classTag + val intPairCT = classTag[(Int, Int)] + + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.emptyRDD[(Int, Int)](intPairCT) + + val joined = rdd1.cogroup(rdd2).collect() + assert(joined.size > 0) + } + + // See SPARK-9326 + test("cogroup with groupByed RDD having 0 partitions") { + import scala.reflect.classTag + val intCT = classTag[Int] + + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.emptyRDD[Int](intCT).groupBy((x) => 5) + val joined = rdd1.cogroup(rdd2).collect() + assert(joined.size > 0) + } + test("rightOuterJoin") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) |