diff options
Diffstat (limited to 'core')
5 files changed, 109 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 880f61c497..0846225e4f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -470,6 +470,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) } /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Uses the given Partitioner to partition the output RDD. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) + : JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other, partitioner) + fromRDD(joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + }) + } + + /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing * partitioner/parallelism level. */ @@ -564,6 +580,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) } /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ + * parallelism level. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other) + fromRDD(joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + }) + } + + /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) + : JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other, numPartitions) + fromRDD(joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + }) + } + + /** * Return the key-value pairs in this RDD to the master as a Map. */ def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap()) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 51ba8c2d17..7f578bc5da 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -507,6 +507,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Uses the given Partitioner to partition the output RDD. + */ + def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Option[V], Option[W]))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, Seq()) => vs.map(v => (Some(v), None)) + case (Seq(), ws) => ws.map(w => (None, Some(w))) + case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w)) + } + } + + /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ @@ -586,6 +603,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ + * parallelism level. + */ + def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = { + fullOuterJoin(other, defaultPartitioner(self, other)) + } + + /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. + */ + def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = { + fullOuterJoin(other, new HashPartitioner(numPartitions)) + } + + /** * Return the key-value pairs in this RDD to the master as a Map. * * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index fc0cee3e87..646ede30ae 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(grouped2.join(grouped4).partitioner === grouped4.partitioner) assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner) assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner) assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner) assert(grouped2.join(reduced2).partitioner === grouped2.partitioner) assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner) assert(grouped2.map(_ => 1).partitioner === None) @@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array")) + assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array")) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index e84cc69592..75b0119190 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -298,6 +298,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { )) } + test("fullOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.fullOuterJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (Some(1), Some('x'))), + (1, (Some(2), Some('x'))), + (2, (Some(1), Some('y'))), + (2, (Some(1), Some('z'))), + (3, (Some(1), None)), + (4, (None, Some('w'))) + )) + } + test("join with no matches") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index c1b501a75c..465c1a8a43 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(rdd.join(emptyKv).collect().size === 0) assert(rdd.rightOuterJoin(emptyKv).collect().size === 0) assert(rdd.leftOuterJoin(emptyKv).collect().size === 2) + assert(rdd.fullOuterJoin(emptyKv).collect().size === 2) assert(rdd.cogroup(emptyKv).collect().size === 2) assert(rdd.union(emptyKv).collect().size === 2) } |