aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorAaron Staple <aaron.staple@gmail.com>2014-09-24 20:39:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-24 20:39:09 -0700
commit8ca4ecb6a56b96bae21b33e27f6abdb53676683a (patch)
treefce8a6b30398815c0b3f7d03db38824d6af9e1a3 /core/src/main/scala
parent74fb2ecf7afc2d314f6477f8f2e6134614387453 (diff)
downloadspark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.gz
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.bz2
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.zip
[SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin. Author: Aaron Staple <aaron.staple@gmail.com> Closes #1395 from staple/SPARK-546 and squashes the following commits: 1f5595c [Aaron Staple] Fix python style 7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream. 3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions. 31f2956 [Aaron Staple] Fix left outer join documentation comments.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala42
2 files changed, 90 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 880f61c497..0846225e4f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -470,6 +470,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Uses the given Partitioner to partition the output RDD.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
+ : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other, partitioner)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
*/
@@ -564,6 +580,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
+ * parallelism level.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
+ : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other, numPartitions)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
* Return the key-value pairs in this RDD to the master as a Map.
*/
def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap())
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 51ba8c2d17..7f578bc5da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -507,6 +507,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Uses the given Partitioner to partition the output RDD.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+ : RDD[(K, (Option[V], Option[W]))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, Seq()) => vs.map(v => (Some(v), None))
+ case (Seq(), ws) => ws.map(w => (None, Some(w)))
+ case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
+ }
+ }
+
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
@@ -586,6 +603,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
+ * parallelism level.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin(other, defaultPartitioner(self, other))
+ }
+
+ /**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin(other, new HashPartitioner(numPartitions))
+ }
+
+ /**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only