aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorAaron Staple <aaron.staple@gmail.com>2014-09-24 20:39:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-24 20:39:09 -0700
commit8ca4ecb6a56b96bae21b33e27f6abdb53676683a (patch)
treefce8a6b30398815c0b3f7d03db38824d6af9e1a3 /streaming
parent74fb2ecf7afc2d314f6477f8f2e6134614387453 (diff)
downloadspark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.gz
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.bz2
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.zip
[SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin. Author: Aaron Staple <aaron.staple@gmail.com> Closes #1395 from staple/SPARK-546 and squashes the following commits: 1f5595c [Aaron Staple] Fix python style 7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream. 3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions. 31f2956 [Aaron Staple] Fix left outer join documentation comments.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala54
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala36
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala15
3 files changed, 101 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index c00e11d119..59d4423086 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -606,8 +606,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
+ * the partitioning of each RDD.
*/
def leftOuterJoin[W](
other: JavaPairDStream[K, W],
@@ -624,8 +625,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* number of partitions.
*/
def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -659,6 +659,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def fullOuterJoin[W](other: JavaPairDStream[K, W])
+ : JavaPairDStream[K, (Optional[V], Optional[W])] = {
+ implicit val cm: ClassTag[W] = fakeClassTag
+ val joinResult = dstream.fullOuterJoin(other.dstream)
+ joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ }
+ }
+
+ /**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def fullOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ numPartitions: Int
+ ): JavaPairDStream[K, (Optional[V], Optional[W])] = {
+ implicit val cm: ClassTag[W] = fakeClassTag
+ val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions)
+ joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ }
+ }
+
+ /**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
+ * the partitioning of each RDD.
+ */
+ def fullOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (Optional[V], Optional[W])] = {
+ implicit val cm: ClassTag[W] = fakeClassTag
+ val joinResult = dstream.fullOuterJoin(other.dstream, partitioner)
+ joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ }
+ }
+
+ /**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 826bf39e86..9467595d30 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -569,6 +569,42 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
}
/**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def fullOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
+ * the partitioning of each RDD.
+ */
+ def fullOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (Option[V], Option[W]))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
+ )
+ }
+
+ /**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
* is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 059ac6c2db..6c8bb50145 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -303,6 +303,21 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(inputData1, inputData2, operation, outputData, true)
}
+ test("fullOuterJoin") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ),
+ Seq( ("", (Some(1), Some("x"))), ("a", (Some(1), None)), ("b", (None, Some("x"))) ),
+ Seq( ("", (Some(1), None)) ),
+ Seq( ("", (None, Some("x"))) )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x")))
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
test("updateStateByKey") {
val inputData =
Seq(