diff options
11 files changed, 250 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 880f61c497..0846225e4f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -470,6 +470,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) } /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Uses the given Partitioner to partition the output RDD. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) + : JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other, partitioner) + fromRDD(joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + }) + } + + /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing * partitioner/parallelism level. */ @@ -564,6 +580,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) } /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ + * parallelism level. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other) + fromRDD(joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + }) + } + + /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) + : JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other, numPartitions) + fromRDD(joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + }) + } + + /** * Return the key-value pairs in this RDD to the master as a Map. */ def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap()) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 51ba8c2d17..7f578bc5da 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -507,6 +507,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Uses the given Partitioner to partition the output RDD. + */ + def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Option[V], Option[W]))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, Seq()) => vs.map(v => (Some(v), None)) + case (Seq(), ws) => ws.map(w => (None, Some(w))) + case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w)) + } + } + + /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ @@ -586,6 +603,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ + * parallelism level. + */ + def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = { + fullOuterJoin(other, defaultPartitioner(self, other)) + } + + /** + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. + */ + def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = { + fullOuterJoin(other, new HashPartitioner(numPartitions)) + } + + /** * Return the key-value pairs in this RDD to the master as a Map. * * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index fc0cee3e87..646ede30ae 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(grouped2.join(grouped4).partitioner === grouped4.partitioner) assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner) assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner) assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner) assert(grouped2.join(reduced2).partitioner === grouped2.partitioner) assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner) assert(grouped2.map(_ => 1).partitioner === None) @@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array")) + assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array")) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index e84cc69592..75b0119190 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -298,6 +298,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { )) } + test("fullOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.fullOuterJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (Some(1), Some('x'))), + (1, (Some(2), Some('x'))), + (2, (Some(1), Some('y'))), + (2, (Some(1), Some('z'))), + (3, (Some(1), None)), + (4, (None, Some('w'))) + )) + } + test("join with no matches") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index c1b501a75c..465c1a8a43 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(rdd.join(emptyKv).collect().size === 0) assert(rdd.rightOuterJoin(emptyKv).collect().size === 0) assert(rdd.leftOuterJoin(emptyKv).collect().size === 2) + assert(rdd.fullOuterJoin(emptyKv).collect().size === 2) assert(rdd.cogroup(emptyKv).collect().size === 2) assert(rdd.union(emptyKv).collect().size === 2) } diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 01d378af57..510b47a2aa 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -906,7 +906,7 @@ for details. <tr> <td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td> <td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. - Outer joins are also supported through <code>leftOuterJoin</code> and <code>rightOuterJoin</code>. + Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>. </td> </tr> <tr> diff --git a/python/pyspark/join.py b/python/pyspark/join.py index b0f1cc1927..b4a8447137 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -80,6 +80,22 @@ def python_left_outer_join(rdd, other, numPartitions): return _do_python_join(rdd, other, numPartitions, dispatch) +def python_full_outer_join(rdd, other, numPartitions): + def dispatch(seq): + vbuf, wbuf = [], [] + for (n, v) in seq: + if n == 1: + vbuf.append(v) + elif n == 2: + wbuf.append(v) + if not vbuf: + vbuf.append(None) + if not wbuf: + wbuf.append(None) + return [(v, w) for v in vbuf for w in wbuf] + return _do_python_join(rdd, other, numPartitions, dispatch) + + def python_cogroup(rdds, numPartitions): def make_mapper(i): return lambda (k, v): (k, (i, v)) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8ef233bc80..680140d72d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -36,7 +36,7 @@ from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer from pyspark.join import python_join, python_left_outer_join, \ - python_right_outer_join, python_cogroup + python_right_outer_join, python_full_outer_join, python_cogroup from pyspark.statcounter import StatCounter from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler from pyspark.storagelevel import StorageLevel @@ -1375,7 +1375,7 @@ class RDD(object): For each element (k, v) in C{self}, the resulting RDD will either contain all pairs (k, (v, w)) for w in C{other}, or the pair - (k, (v, None)) if no elements in other have key k. + (k, (v, None)) if no elements in C{other} have key k. Hash-partitions the resulting RDD into the given number of partitions. @@ -1403,6 +1403,27 @@ class RDD(object): """ return python_right_outer_join(self, other, numPartitions) + def fullOuterJoin(self, other, numPartitions=None): + """ + Perform a right outer join of C{self} and C{other}. + + For each element (k, v) in C{self}, the resulting RDD will either + contain all pairs (k, (v, w)) for w in C{other}, or the pair + (k, (v, None)) if no elements in C{other} have key k. + + Similarly, for each element (k, w) in C{other}, the resulting RDD will + either contain all pairs (k, (v, w)) for v in C{self}, or the pair + (k, (None, w)) if no elements in C{self} have key k. + + Hash-partitions the resulting RDD into the given number of partitions. + + >>> x = sc.parallelize([("a", 1), ("b", 4)]) + >>> y = sc.parallelize([("a", 2), ("c", 8)]) + >>> sorted(x.fullOuterJoin(y).collect()) + [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))] + """ + return python_full_outer_join(self, other, numPartitions) + # TODO: add option to control map-side combining # portable_hash is used as default, because builtin hash of None is different # cross machines. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index c00e11d119..59d4423086 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -606,8 +606,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. */ def leftOuterJoin[W]( other: JavaPairDStream[K, W], @@ -624,8 +625,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * number of partitions. */ def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.rightOuterJoin(other.dstream) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -659,6 +659,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def fullOuterJoin[W](other: JavaPairDStream[K, W]) + : JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassTag[W] = fakeClassTag + val joinResult = dstream.fullOuterJoin(other.dstream) + joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + } + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def fullOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassTag[W] = fakeClassTag + val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + } + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. + */ + def fullOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassTag[W] = fakeClassTag + val joinResult = dstream.fullOuterJoin(other.dstream, partitioner) + joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + } + } + + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 826bf39e86..9467595d30 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -569,6 +569,42 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) } /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = { + fullOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def fullOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (Option[V], Option[W]))] = { + fullOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. + */ + def fullOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Option[V], Option[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner) + ) + } + + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 059ac6c2db..6c8bb50145 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -303,6 +303,21 @@ class BasicOperationsSuite extends TestSuiteBase { testOperation(inputData1, inputData2, operation, outputData, true) } + test("fullOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ), + Seq( ("", (Some(1), Some("x"))), ("a", (Some(1), None)), ("b", (None, Some("x"))) ), + Seq( ("", (Some(1), None)) ), + Seq( ("", (None, Some("x"))) ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + test("updateStateByKey") { val inputData = Seq( |