aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala42
2 files changed, 90 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 880f61c497..0846225e4f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -470,6 +470,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Uses the given Partitioner to partition the output RDD.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
+ : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other, partitioner)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
*/
@@ -564,6 +580,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
+ * parallelism level.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
+ : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other, numPartitions)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
* Return the key-value pairs in this RDD to the master as a Map.
*/
def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap())
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 51ba8c2d17..7f578bc5da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -507,6 +507,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Uses the given Partitioner to partition the output RDD.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+ : RDD[(K, (Option[V], Option[W]))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, Seq()) => vs.map(v => (Some(v), None))
+ case (Seq(), ws) => ws.map(w => (None, Some(w)))
+ case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
+ }
+ }
+
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
@@ -586,6 +603,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
+ * parallelism level.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin(other, defaultPartitioner(self, other))
+ }
+
+ /**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin(other, new HashPartitioner(numPartitions))
+ }
+
+ /**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only