aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala42
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala1
-rw-r--r--docs/programming-guide.md2
-rw-r--r--python/pyspark/join.py16
-rw-r--r--python/pyspark/rdd.py25
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala54
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala36
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala15
11 files changed, 250 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 880f61c497..0846225e4f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -470,6 +470,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Uses the given Partitioner to partition the output RDD.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
+ : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other, partitioner)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
*/
@@ -564,6 +580,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
+ * parallelism level.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
+ */
+ def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
+ : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+ val joinResult = rdd.fullOuterJoin(other, numPartitions)
+ fromRDD(joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ })
+ }
+
+ /**
* Return the key-value pairs in this RDD to the master as a Map.
*/
def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap())
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 51ba8c2d17..7f578bc5da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -507,6 +507,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Uses the given Partitioner to partition the output RDD.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+ : RDD[(K, (Option[V], Option[W]))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, Seq()) => vs.map(v => (Some(v), None))
+ case (Seq(), ws) => ws.map(w => (None, Some(w)))
+ case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
+ }
+ }
+
+ /**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
@@ -586,6 +603,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
+ * parallelism level.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin(other, defaultPartitioner(self, other))
+ }
+
+ /**
+ * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
+ * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
+ * element (k, w) in `other`, the resulting RDD will either contain all pairs
+ * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
+ * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
+ */
+ def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin(other, new HashPartitioner(numPartitions))
+ }
+
+ /**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index fc0cee3e87..646ede30ae 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
+ assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.map(_ => 1).partitioner === None)
@@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
+ assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index e84cc69592..75b0119190 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -298,6 +298,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}
+ test("fullOuterJoin") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.fullOuterJoin(rdd2).collect()
+ assert(joined.size === 6)
+ assert(joined.toSet === Set(
+ (1, (Some(1), Some('x'))),
+ (1, (Some(2), Some('x'))),
+ (2, (Some(1), Some('y'))),
+ (2, (Some(1), Some('z'))),
+ (3, (Some(1), None)),
+ (4, (None, Some('w')))
+ ))
+ }
+
test("join with no matches") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index c1b501a75c..465c1a8a43 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.join(emptyKv).collect().size === 0)
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
+ assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
assert(rdd.cogroup(emptyKv).collect().size === 2)
assert(rdd.union(emptyKv).collect().size === 2)
}
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 01d378af57..510b47a2aa 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -906,7 +906,7 @@ for details.
<tr>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
- Outer joins are also supported through <code>leftOuterJoin</code> and <code>rightOuterJoin</code>.
+ Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
</td>
</tr>
<tr>
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index b0f1cc1927..b4a8447137 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -80,6 +80,22 @@ def python_left_outer_join(rdd, other, numPartitions):
return _do_python_join(rdd, other, numPartitions, dispatch)
+def python_full_outer_join(rdd, other, numPartitions):
+ def dispatch(seq):
+ vbuf, wbuf = [], []
+ for (n, v) in seq:
+ if n == 1:
+ vbuf.append(v)
+ elif n == 2:
+ wbuf.append(v)
+ if not vbuf:
+ vbuf.append(None)
+ if not wbuf:
+ wbuf.append(None)
+ return [(v, w) for v in vbuf for w in wbuf]
+ return _do_python_join(rdd, other, numPartitions, dispatch)
+
+
def python_cogroup(rdds, numPartitions):
def make_mapper(i):
return lambda (k, v): (k, (i, v))
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8ef233bc80..680140d72d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -36,7 +36,7 @@ from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
from pyspark.join import python_join, python_left_outer_join, \
- python_right_outer_join, python_cogroup
+ python_right_outer_join, python_full_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
@@ -1375,7 +1375,7 @@ class RDD(object):
For each element (k, v) in C{self}, the resulting RDD will either
contain all pairs (k, (v, w)) for w in C{other}, or the pair
- (k, (v, None)) if no elements in other have key k.
+ (k, (v, None)) if no elements in C{other} have key k.
Hash-partitions the resulting RDD into the given number of partitions.
@@ -1403,6 +1403,27 @@ class RDD(object):
"""
return python_right_outer_join(self, other, numPartitions)
+ def fullOuterJoin(self, other, numPartitions=None):
+ """
+ Perform a right outer join of C{self} and C{other}.
+
+ For each element (k, v) in C{self}, the resulting RDD will either
+ contain all pairs (k, (v, w)) for w in C{other}, or the pair
+ (k, (v, None)) if no elements in C{other} have key k.
+
+ Similarly, for each element (k, w) in C{other}, the resulting RDD will
+ either contain all pairs (k, (v, w)) for v in C{self}, or the pair
+ (k, (None, w)) if no elements in C{self} have key k.
+
+ Hash-partitions the resulting RDD into the given number of partitions.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4)])
+ >>> y = sc.parallelize([("a", 2), ("c", 8)])
+ >>> sorted(x.fullOuterJoin(y).collect())
+ [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
+ """
+ return python_full_outer_join(self, other, numPartitions)
+
# TODO: add option to control map-side combining
# portable_hash is used as default, because builtin hash of None is different
# cross machines.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index c00e11d119..59d4423086 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -606,8 +606,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
+ * the partitioning of each RDD.
*/
def leftOuterJoin[W](
other: JavaPairDStream[K, W],
@@ -624,8 +625,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* number of partitions.
*/
def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -659,6 +659,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def fullOuterJoin[W](other: JavaPairDStream[K, W])
+ : JavaPairDStream[K, (Optional[V], Optional[W])] = {
+ implicit val cm: ClassTag[W] = fakeClassTag
+ val joinResult = dstream.fullOuterJoin(other.dstream)
+ joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ }
+ }
+
+ /**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def fullOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ numPartitions: Int
+ ): JavaPairDStream[K, (Optional[V], Optional[W])] = {
+ implicit val cm: ClassTag[W] = fakeClassTag
+ val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions)
+ joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ }
+ }
+
+ /**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
+ * the partitioning of each RDD.
+ */
+ def fullOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (Optional[V], Optional[W])] = {
+ implicit val cm: ClassTag[W] = fakeClassTag
+ val joinResult = dstream.fullOuterJoin(other.dstream, partitioner)
+ joinResult.mapValues{ case (v, w) =>
+ (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+ }
+ }
+
+ /**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 826bf39e86..9467595d30 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -569,6 +569,42 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
}
/**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def fullOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (Option[V], Option[W]))] = {
+ fullOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
+ * the partitioning of each RDD.
+ */
+ def fullOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (Option[V], Option[W]))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
+ )
+ }
+
+ /**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
* is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 059ac6c2db..6c8bb50145 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -303,6 +303,21 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(inputData1, inputData2, operation, outputData, true)
}
+ test("fullOuterJoin") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ),
+ Seq( ("", (Some(1), Some("x"))), ("a", (Some(1), None)), ("b", (None, Some("x"))) ),
+ Seq( ("", (Some(1), None)) ),
+ Seq( ("", (None, Some("x"))) )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x")))
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
test("updateStateByKey") {
val inputData =
Seq(