aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-10-21 05:34:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-10-21 05:34:09 -0700
commit06664987990debcb4439a9dc26e1859508c601f5 (patch)
tree28641321cde005c2ea41edfe61c343bfc1ce5359 /streaming
parentcf64f63f8a3b54dec37e991856260ac63f7e222e (diff)
downloadspark-06664987990debcb4439a9dc26e1859508c601f5.tar.gz
spark-06664987990debcb4439a9dc26e1859508c601f5.tar.bz2
spark-06664987990debcb4439a9dc26e1859508c601f5.zip
Updated TransformDStream to allow n-ary DStream transform. Added transformWith, leftOuterJoin and rightOuterJoin operations to DStream for Scala and Java APIs. Also added n-ary union and n-ary transform operations to StreamingContext for Scala and Java APIs.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala66
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala79
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala57
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala25
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala20
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java89
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala103
9 files changed, 457 insertions, 33 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 80da6bd30b..ee351daa60 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -500,7 +500,7 @@ abstract class DStream[T: ClassManifest] (
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
- transform((r: RDD[T], t: Time) => transformFunc(r))
+ transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
}
/**
@@ -508,7 +508,41 @@ abstract class DStream[T: ClassManifest] (
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
- new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+ //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+ val cleanedF = context.sparkContext.clean(transformFunc)
+ val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ assert(rdds.length == 1)
+ cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
+ }
+ new TransformedDStream[U](Seq(this), realTransformFunc)
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function on RDDs
+ * of DStreams stream1 and stream2.
+ */
+ def transformWith[U: ClassManifest, V: ClassManifest](
+ other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
+ ): DStream[V] = {
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
+ transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function on RDDs
+ * of DStreams stream1 and stream2.
+ */
+ def transformWith[U: ClassManifest, V: ClassManifest](
+ other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
+ ): DStream[V] = {
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
+ val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ assert(rdds.length == 2)
+ val rdd1 = rdds(0).asInstanceOf[RDD[T]]
+ val rdd2 = rdds(1).asInstanceOf[RDD[U]]
+ cleanedF(rdd1, rdd2, time)
+ }
+ new TransformedDStream[V](Seq(this, other), realTransformFunc)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 757bc98981..c319433e54 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -444,27 +444,73 @@ extends Serializable {
}
/**
- * Join `this` DStream with `other` DStream. HashPartitioner is used
- * to partition each generated RDD into default number of partitions.
+ * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream..
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}
/**
- * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
- * be generated by joining RDDs from `this` and other DStream. Uses the given
- * Partitioner to partition each generated RDD.
+ * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def join[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, W))] = {
- this.cogroup(other, partitioner)
- .flatMapValues{
- case (vs, ws) =>
- for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
- }
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+ leftOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
+ */
+ def leftOuterJoin[W: ClassManifest](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (V, Option[W]))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+ rightOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
+ */
+ def rightOuterJoin[W: ClassManifest](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (Option[V], W))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
+ )
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 098081d245..3217ef4581 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -455,13 +455,24 @@ class StreamingContext private (
}
/**
- * Create a unified DStream from multiple DStreams of the same type and same interval
+ * Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
/**
+ * Create a new DStream in which each RDD is generated by applying a function on RDDs of
+ * the DStreams.
+ */
+ def transform[T: ClassManifest](
+ streams: Seq[DStream[_]],
+ transformFunc: (Seq[RDD[_]], Time) => RDD[T]
+ ): DStream[T] = {
+ new TransformedDStream[T](streams, sparkContext.clean(transformFunc))
+ }
+
+ /**
* Register an input stream that will be started (InputDStream.start() called) to get the
* input data.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 459695b7ca..1110d770c4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -24,7 +24,8 @@ import scala.collection.JavaConversions._
import org.apache.spark.streaming._
import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
-import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
import java.util
import org.apache.spark.rdd.RDD
import JavaDStream._
@@ -307,6 +308,82 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
}
/**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this and other DStreams.
+ */
+ def transformWith[U, W](
+ other: JavaDStream[U],
+ transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
+ ): JavaDStream[W] = {
+ implicit val cmu: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cmv: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this and other DStreams.
+ */
+ def transformWith[U, K2, V2](
+ other: JavaDStream[U],
+ transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
+ ): JavaPairDStream[K2, V2] = {
+ implicit val cmu: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cmk2: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv2: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this and other DStreams.
+ */
+ def transformWith[K, V, W](
+ other: JavaPairDStream[K, V],
+ transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaRDD[W]]
+ ): JavaDStream[W] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmw: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[W] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[(K, V), W](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this and other DStreams.
+ */
+ def transformWith[K, V, K2, V2](
+ other: JavaPairDStream[K, V],
+ transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]
+ ): JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmk2: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv2: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[(K, V), (K2, V2)](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 978fca33ad..821db46fff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -36,7 +36,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
- implicit val kManifiest: ClassManifest[K],
+ implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
@@ -499,8 +499,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Join `this` DStream with `other` DStream. HashPartitioner is used
- * to partition each generated RDD into default number of partitions.
+ * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream..
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
@@ -509,9 +509,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
- * be generated by joining RDDs from `this` and other DStream. Uses the given
- * Partitioner to partition each generated RDD.
+ * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (V, W)] = {
@@ -521,6 +520,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
+ * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream..
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ */
+ def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.leftOuterJoin(other.dstream)
+ joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+ }
+
+ /**
+ * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ */
+ def leftOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
+ : JavaPairDStream[K, (V, Optional[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
+ joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+ }
+
+ /**
+ * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream..
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ */
+ def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.rightOuterJoin(other.dstream)
+ joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+ }
+
+ /**
+ * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ */
+ def rightOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
+ : JavaPairDStream[K, (Optional[V], W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
+ joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+ }
+
+ /**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 54ba3e6025..405f715d50 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.api.java
import java.lang.{Long => JLong, Integer => JInt}
import java.io.InputStream
-import java.util.{Map => JMap}
+import java.util.{Map => JMap, List => JList}
import scala.collection.JavaConversions._
@@ -585,6 +585,29 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
+ * Create a unified DStream from multiple DStreams of the same type and same slide duration.
+ */
+ def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {
+ val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
+ implicit val cm: ClassManifest[T] = first.classManifest
+ ssc.union(dstreams)(cm)
+ }
+
+ /**
+ * Create a unified DStream from multiple DStreams of the same type and same slide duration.
+ */
+ def union[K, V](
+ first: JavaPairDStream[K, V],
+ rest: JList[JavaPairDStream[K, V]]
+ ): JavaPairDStream[K, V] = {
+ val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
+ implicit val cm: ClassManifest[(K, V)] = first.classManifest
+ implicit val kcm: ClassManifest[K] = first.kManifest
+ implicit val vcm: ClassManifest[V] = first.vManifest
+ new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm)
+ }
+
+ /**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 60485adef9..71bcb2b390 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -21,16 +21,22 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, DStream, Time}
private[streaming]
-class TransformedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- transformFunc: (RDD[T], Time) => RDD[U]
- ) extends DStream[U](parent.ssc) {
+class TransformedDStream[U: ClassManifest] (
+ parents: Seq[DStream[_]],
+ transformFunc: (Seq[RDD[_]], Time) => RDD[U]
+ ) extends DStream[U](parents.head.ssc) {
- override def dependencies = List(parent)
+ require(parents.length > 0, "List of DStreams to transform is empty")
+ require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
+ require(parents.map(_.slideDuration).distinct.size == 1,
+ "Some of the DStreams have different slide durations")
- override def slideDuration: Duration = parent.slideDuration
+ override def dependencies = parents.toList
+
+ override def slideDuration: Duration = parents.head.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(transformFunc(_, validTime))
+ val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
+ Some(transformFunc(parentRDDs, validTime))
}
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..9f885f07f2 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -223,7 +223,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(mapped);
- List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -338,6 +338,58 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testTransformWith() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
+ pairStream2,
+ new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() {
+ @Override
+ public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception {
+ return stringStringJavaPairRDD.join(stringStringJavaPairRDD2);
+ }
+ }
+ );
+
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+
+ }
+
+ @Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("go", "giants"),
@@ -1099,7 +1151,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -1142,7 +1194,38 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
JavaTestUtils.attachTestOutputStream(joined);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testLeftOuterJoin() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks") ));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants") ),
+ Arrays.asList(new Tuple2<String, String>("new york", "islanders") )
+
+ );
+
+ List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = pairStream1.leftOuterJoin(pairStream2);
+ JavaDStream<Long> counted = joined.count();
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 11586f72b6..a2ac510a98 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -18,7 +18,10 @@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
-import scala.runtime.RichInt
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+
import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
@@ -143,6 +146,72 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
+ test("union") {
+ val input = Seq(1 to 4, 101 to 104, 201 to 204)
+ val output = Seq(1 to 8, 101 to 108, 201 to 208)
+ testOperation(
+ input,
+ (s: DStream[Int]) => s.union(s.map(_ + 4)) ,
+ output
+ )
+ }
+
+ test("StreamingContext.union") {
+ val input = Seq(1 to 4, 101 to 104, 201 to 204)
+ val output = Seq(1 to 12, 101 to 112, 201 to 212)
+ // union over 3 DStreams
+ testOperation(
+ input,
+ (s: DStream[Int]) => s.context.union(Seq(s, s.map(_ + 4), s.map(_ + 8))),
+ output
+ )
+ }
+
+ test("transform") {
+ val input = Seq(1 to 4, 5 to 8, 9 to 12)
+ testOperation(
+ input,
+ (r: DStream[Int]) => r.transform(rdd => rdd.map(_.toString)), // RDD.map in transform
+ input.map(_.map(_.toString))
+ )
+ }
+
+ test("transformWith") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (1, "x")), ("b", (1, "x")) ),
+ Seq( ("", (1, "x")) ),
+ Seq( ),
+ Seq( )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ val t1 = s1.map(x => (x, 1))
+ val t2 = s2.map(x => (x, "x"))
+ t1.transformWith( // RDD.join in transform
+ t2,
+ (rdd1: RDD[(String, Int)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2)
+ )
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
+ test("StreamingContext.transform") {
+ val input = Seq(1 to 4, 101 to 104, 201 to 204)
+ val output = Seq(1 to 12, 101 to 112, 201 to 212)
+
+ // transform over 3 DStreams by doing union of the 3 RDDs
+ val operation = (s: DStream[Int]) => {
+ s.context.transform(
+ Seq(s, s.map(_ + 4), s.map(_ + 8)), // 3 DStreams
+ (rdds: Seq[RDD[_]], time: Time) =>
+ rdds.head.context.union(rdds.map(_.asInstanceOf[RDD[Int]])) // union of RDDs
+ )
+ }
+
+ testOperation(input, operation, output)
+ }
+
test("cogroup") {
val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() )
@@ -168,7 +237,37 @@ class BasicOperationsSuite extends TestSuiteBase {
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
- s1.map(x => (x,1)).join(s2.map(x => (x,"x")))
+ s1.map(x => (x, 1)).join(s2.map(x => (x, "x")))
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
+ test("leftOuterJoin") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (1, Some("x"))), ("b", (1, Some("x"))) ),
+ Seq( ("", (1, Some("x"))), ("a", (1, None)) ),
+ Seq( ("", (1, None)) ),
+ Seq( )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ s1.map(x => (x, 1)).leftOuterJoin(s2.map(x => (x, "x")))
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
+ test("rightOuterJoin") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (Some(1), "x")), ("b", (Some(1), "x")) ),
+ Seq( ("", (Some(1), "x")), ("b", (None, "x")) ),
+ Seq( ),
+ Seq( ("", (None, "x")) )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ s1.map(x => (x, 1)).rightOuterJoin(s2.map(x => (x, "x")))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}