diff options
author | Aaron Staple <aaron.staple@gmail.com> | 2014-09-24 20:39:09 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-09-24 20:39:09 -0700 |
commit | 8ca4ecb6a56b96bae21b33e27f6abdb53676683a (patch) | |
tree | fce8a6b30398815c0b3f7d03db38824d6af9e1a3 /streaming/src | |
parent | 74fb2ecf7afc2d314f6477f8f2e6134614387453 (diff) | |
download | spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.gz spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.bz2 spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.zip |
[SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin.
Author: Aaron Staple <aaron.staple@gmail.com>
Closes #1395 from staple/SPARK-546 and squashes the following commits:
1f5595c [Aaron Staple] Fix python style
7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream.
3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions.
31f2956 [Aaron Staple] Fix left outer join documentation comments.
Diffstat (limited to 'streaming/src')
3 files changed, 101 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index c00e11d119..59d4423086 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -606,8 +606,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. */ def leftOuterJoin[W]( other: JavaPairDStream[K, W], @@ -624,8 +625,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * number of partitions. */ def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.rightOuterJoin(other.dstream) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -659,6 +659,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def fullOuterJoin[W](other: JavaPairDStream[K, W]) + : JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassTag[W] = fakeClassTag + val joinResult = dstream.fullOuterJoin(other.dstream) + joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + } + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def fullOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassTag[W] = fakeClassTag + val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + } + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. + */ + def fullOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassTag[W] = fakeClassTag + val joinResult = dstream.fullOuterJoin(other.dstream, partitioner) + joinResult.mapValues{ case (v, w) => + (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + } + } + + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 826bf39e86..9467595d30 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -569,6 +569,42 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) } /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = { + fullOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def fullOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (Option[V], Option[W]))] = { + fullOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. + */ + def fullOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Option[V], Option[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner) + ) + } + + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 059ac6c2db..6c8bb50145 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -303,6 +303,21 @@ class BasicOperationsSuite extends TestSuiteBase { testOperation(inputData1, inputData2, operation, outputData, true) } + test("fullOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ), + Seq( ("", (Some(1), Some("x"))), ("a", (Some(1), None)), ("b", (None, Some("x"))) ), + Seq( ("", (Some(1), None)) ), + Seq( ("", (None, Some("x"))) ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + test("updateStateByKey") { val inputData = Seq( |