aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Staple <aaron.staple@gmail.com>2014-09-24 20:39:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-24 20:39:09 -0700
commit8ca4ecb6a56b96bae21b33e27f6abdb53676683a (patch)
treefce8a6b30398815c0b3f7d03db38824d6af9e1a3 /core
parent74fb2ecf7afc2d314f6477f8f2e6134614387453 (diff)
downloadspark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.gz
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.bz2
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.zip
[SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin. Author: Aaron Staple <aaron.staple@gmail.com> Closes #1395 from staple/SPARK-546 and squashes the following commits: 1f5595c [Aaron Staple] Fix python style 7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream. 3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions. 31f2956 [Aaron Staple] Fix left outer join documentation comments.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala42
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala1
5 files changed, 109 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 880f61c497..0846225e4f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -470,6 +470,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Uses the given Partitioner to partition the output RDD.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
+ : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other, partitioner)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
*/
@@ -564,6 +580,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
+ * parallelism level.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
+ : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other, numPartitions)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
* Return the key-value pairs in this RDD to the master as a Map.
*/
def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap())
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 51ba8c2d17..7f578bc5da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -507,6 +507,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Uses the given Partitioner to partition the output RDD.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+ : RDD[(K, (Option[V], Option[W]))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, Seq()) => vs.map(v => (Some(v), None))
+ case (Seq(), ws) => ws.map(w => (None, Some(w)))
+ case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
+ }
+ }
+
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
@@ -586,6 +603,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
+ * parallelism level.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin(other, defaultPartitioner(self, other))
+ }
+
+ /**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin(other, new HashPartitioner(numPartitions))
+ }
+
+ /**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index fc0cee3e87..646ede30ae 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
+ assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.map(_ => 1).partitioner === None)
@@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index e84cc69592..75b0119190 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -298,6 +298,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}
+ test("fullOuterJoin") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.fullOuterJoin(rdd2).collect()
+ assert(joined.size === 6)
+ assert(joined.toSet === Set(
+ (1, (Some(1), Some('x'))),
+ (1, (Some(2), Some('x'))),
+ (2, (Some(1), Some('y'))),
+ (2, (Some(1), Some('z'))),
+ (3, (Some(1), None)),
+ (4, (None, Some('w')))
+ ))
+ }
+
test("join with no matches") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index c1b501a75c..465c1a8a43 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.join(emptyKv).collect().size === 0)
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
+ assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
assert(rdd.cogroup(emptyKv).collect().size === 2)
assert(rdd.union(emptyKv).collect().size === 2)
}