diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-10-21 05:34:09 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-10-21 05:34:09 -0700 |
commit | 06664987990debcb4439a9dc26e1859508c601f5 (patch) | |
tree | 28641321cde005c2ea41edfe61c343bfc1ce5359 /streaming/src | |
parent | cf64f63f8a3b54dec37e991856260ac63f7e222e (diff) | |
download | spark-06664987990debcb4439a9dc26e1859508c601f5.tar.gz spark-06664987990debcb4439a9dc26e1859508c601f5.tar.bz2 spark-06664987990debcb4439a9dc26e1859508c601f5.zip |
Updated TransformDStream to allow n-ary DStream transform. Added transformWith, leftOuterJoin and rightOuterJoin operations to DStream for Scala and Java APIs. Also added n-ary union and n-ary transform operations to StreamingContext for Scala and Java APIs.
Diffstat (limited to 'streaming/src')
9 files changed, 457 insertions, 33 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 80da6bd30b..ee351daa60 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -500,7 +500,7 @@ abstract class DStream[T: ClassManifest] ( * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => transformFunc(r)) + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) } /** @@ -508,7 +508,41 @@ abstract class DStream[T: ClassManifest] ( * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - new TransformedDStream(this, context.sparkContext.clean(transformFunc)) + //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) + val cleanedF = context.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 1) + cleanedF(rdds.head.asInstanceOf[RDD[T]], time) + } + new TransformedDStream[U](Seq(this), realTransformFunc) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function on RDDs + * of DStreams stream1 and stream2. + */ + def transformWith[U: ClassManifest, V: ClassManifest]( + other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function on RDDs + * of DStreams stream1 and stream2. + */ + def transformWith[U: ClassManifest, V: ClassManifest]( + other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 2) + val rdd1 = rdds(0).asInstanceOf[RDD[T]] + val rdd2 = rdds(1).asInstanceOf[RDD[U]] + cleanedF(rdd1, rdd2, time) + } + new TransformedDStream[V](Seq(this, other), realTransformFunc) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 757bc98981..c319433e54 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -444,27 +444,73 @@ extends Serializable { } /** - * Join `this` DStream with `other` DStream. HashPartitioner is used - * to partition each generated RDD into default number of partitions. + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner()) } /** - * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will - * be generated by joining RDDs from `this` and other DStream. Uses the given - * Partitioner to partition each generated RDD. + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def join[W: ClassManifest]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (V, W))] = { - this.cogroup(other, partitioner) - .flatMapValues{ - case (vs, ws) => - for (v <- vs.iterator; w <- ws.iterator) yield (v, w) - } + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) + ) + } + + /** + * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def leftOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, Option[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) + ) + } + + /** + * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def rightOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Option[V], W))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) + ) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 098081d245..3217ef4581 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -455,13 +455,24 @@ class StreamingContext private ( } /** - * Create a unified DStream from multiple DStreams of the same type and same interval + * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { new UnionDStream[T](streams.toArray) } /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. + */ + def transform[T: ClassManifest]( + streams: Seq[DStream[_]], + transformFunc: (Seq[RDD[_]], Time) => RDD[T] + ): DStream[T] = { + new TransformedDStream[T](streams, sparkContext.clean(transformFunc)) + } + + /** * Register an input stream that will be started (InputDStream.start() called) to get the * input data. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 459695b7ca..1110d770c4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -24,7 +24,8 @@ import scala.collection.JavaConversions._ import org.apache.spark.streaming._ import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD} -import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.api.java.function.{Function3 => JFunction3, _} import java.util import org.apache.spark.rdd.RDD import JavaDStream._ @@ -307,6 +308,82 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T } /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this and other DStreams. + */ + def transformWith[U, W]( + other: JavaDStream[U], + transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]] + ): JavaDStream[W] = { + implicit val cmu: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cmv: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this and other DStreams. + */ + def transformWith[U, K2, V2]( + other: JavaDStream[U], + transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]] + ): JavaPairDStream[K2, V2] = { + implicit val cmu: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cmk2: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv2: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this and other DStreams. + */ + def transformWith[K, V, W]( + other: JavaPairDStream[K, V], + transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaRDD[W]] + ): JavaDStream[W] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmw: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[W] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[(K, V), W](other.dstream, scalaTransform(_, _, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this and other DStreams. + */ + def transformWith[K, V, K2, V2]( + other: JavaPairDStream[K, V], + transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]] + ): JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk2: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv2: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[(K, V), (K2, V2)](other.dstream, scalaTransform(_, _, _)) + } + + /** * Enable periodic checkpointing of RDDs of this DStream * @param interval Time interval after which generated RDD will be checkpointed */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 978fca33ad..821db46fff 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -36,7 +36,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( - implicit val kManifiest: ClassManifest[K], + implicit val kManifest: ClassManifest[K], implicit val vManifest: ClassManifest[V]) extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { @@ -499,8 +499,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Join `this` DStream with `other` DStream. HashPartitioner is used - * to partition each generated RDD into default number of partitions. + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { implicit val cm: ClassManifest[W] = @@ -509,9 +509,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will - * be generated by joining RDDs from `this` and other DStream. Uses the given - * Partitioner to partition each generated RDD. + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) : JavaPairDStream[K, (V, W)] = { @@ -521,6 +520,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** + * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream.. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + */ + def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.leftOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + */ + def leftOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (V, Optional[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream.. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + */ + def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.rightOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + */ + def rightOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (Optional[V], W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 54ba3e6025..405f715d50 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.api.java import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream -import java.util.{Map => JMap} +import java.util.{Map => JMap, List => JList} import scala.collection.JavaConversions._ @@ -585,6 +585,29 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** + * Create a unified DStream from multiple DStreams of the same type and same slide duration. + */ + def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = { + val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) + implicit val cm: ClassManifest[T] = first.classManifest + ssc.union(dstreams)(cm) + } + + /** + * Create a unified DStream from multiple DStreams of the same type and same slide duration. + */ + def union[K, V]( + first: JavaPairDStream[K, V], + rest: JList[JavaPairDStream[K, V]] + ): JavaPairDStream[K, V] = { + val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) + implicit val cm: ClassManifest[(K, V)] = first.classManifest + implicit val kcm: ClassManifest[K] = first.kManifest + implicit val vcm: ClassManifest[V] = first.vManifest + new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm) + } + + /** * Sets the context to periodically checkpoint the DStream operations for master * fault-tolerance. The graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 60485adef9..71bcb2b390 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -21,16 +21,22 @@ import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, DStream, Time} private[streaming] -class TransformedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], - transformFunc: (RDD[T], Time) => RDD[U] - ) extends DStream[U](parent.ssc) { +class TransformedDStream[U: ClassManifest] ( + parents: Seq[DStream[_]], + transformFunc: (Seq[RDD[_]], Time) => RDD[U] + ) extends DStream[U](parents.head.ssc) { - override def dependencies = List(parent) + require(parents.length > 0, "List of DStreams to transform is empty") + require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts") + require(parents.map(_.slideDuration).distinct.size == 1, + "Some of the DStreams have different slide durations") - override def slideDuration: Duration = parent.slideDuration + override def dependencies = parents.toList + + override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(transformFunc(_, validTime)) + val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq + Some(transformFunc(parentRDDs, validTime)) } } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index c0d729ff87..9f885f07f2 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -223,7 +223,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(mapped); - List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -338,6 +338,58 @@ public class JavaAPISuite implements Serializable { } @Test + public void testTransformWith() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("dodgers", "giants")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("sharks", "ducks")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith( + pairStream2, + new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() { + @Override + public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception { + return stringStringJavaPairRDD.join(stringStringJavaPairRDD2); + } + } + ); + + JavaTestUtils.attachTestOutputStream(joined); + List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + + } + + @Test public void testFlatMap() { List<List<String>> inputData = Arrays.asList( Arrays.asList("go", "giants"), @@ -1099,7 +1151,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -1142,7 +1194,38 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2); JavaTestUtils.attachTestOutputStream(joined); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testLeftOuterJoin() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks") )); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants") ), + Arrays.asList(new Tuple2<String, String>("new york", "islanders") ) + + ); + + List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L)); + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = pairStream1.leftOuterJoin(pairStream2); + JavaDStream<Long> counted = joined.count(); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 11586f72b6..a2ac510a98 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -18,7 +18,10 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import scala.runtime.RichInt + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { @@ -143,6 +146,72 @@ class BasicOperationsSuite extends TestSuiteBase { ) } + test("union") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 8, 101 to 108, 201 to 208) + testOperation( + input, + (s: DStream[Int]) => s.union(s.map(_ + 4)) , + output + ) + } + + test("StreamingContext.union") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 12, 101 to 112, 201 to 212) + // union over 3 DStreams + testOperation( + input, + (s: DStream[Int]) => s.context.union(Seq(s, s.map(_ + 4), s.map(_ + 8))), + output + ) + } + + test("transform") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.transform(rdd => rdd.map(_.toString)), // RDD.map in transform + input.map(_.map(_.toString)) + ) + } + + test("transformWith") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, "x")), ("b", (1, "x")) ), + Seq( ("", (1, "x")) ), + Seq( ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + val t1 = s1.map(x => (x, 1)) + val t2 = s2.map(x => (x, "x")) + t1.transformWith( // RDD.join in transform + t2, + (rdd1: RDD[(String, Int)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2) + ) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("StreamingContext.transform") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 12, 101 to 112, 201 to 212) + + // transform over 3 DStreams by doing union of the 3 RDDs + val operation = (s: DStream[Int]) => { + s.context.transform( + Seq(s, s.map(_ + 4), s.map(_ + 8)), // 3 DStreams + (rdds: Seq[RDD[_]], time: Time) => + rdds.head.context.union(rdds.map(_.asInstanceOf[RDD[Int]])) // union of RDDs + ) + } + + testOperation(input, operation, output) + } + test("cogroup") { val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() ) val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() ) @@ -168,7 +237,37 @@ class BasicOperationsSuite extends TestSuiteBase { Seq( ) ) val operation = (s1: DStream[String], s2: DStream[String]) => { - s1.map(x => (x,1)).join(s2.map(x => (x,"x"))) + s1.map(x => (x, 1)).join(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("leftOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, Some("x"))), ("b", (1, Some("x"))) ), + Seq( ("", (1, Some("x"))), ("a", (1, None)) ), + Seq( ("", (1, None)) ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).leftOuterJoin(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("rightOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (Some(1), "x")), ("b", (Some(1), "x")) ), + Seq( ("", (Some(1), "x")), ("b", (None, "x")) ), + Seq( ), + Seq( ("", (None, "x")) ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).rightOuterJoin(s2.map(x => (x, "x"))) } testOperation(inputData1, inputData2, operation, outputData, true) } |