aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function3.java36
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala34
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala46
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala152
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala97
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala178
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala75
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala58
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala20
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java376
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala103
13 files changed, 1037 insertions, 162 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 39f408b8c8..2142fd7327 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -622,4 +622,15 @@ object JavaPairRDD {
new JavaPairRDD[K, V](rdd)
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
+
+
+ /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
+ def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ new JavaPairRDD[K, V](rdd.rdd)
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
new file mode 100644
index 0000000000..ac6178924a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import scala.reflect.ClassManifest;
+import scala.reflect.ClassManifest$;
+import scala.runtime.AbstractFunction2;
+
+import java.io.Serializable;
+
+/**
+ * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
+ */
+public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
+ implements Serializable {
+
+ public ClassManifest<R> returnType() {
+ return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
new file mode 100644
index 0000000000..d314dbdf1d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.runtime.AbstractFunction3
+
+/**
+ * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the
+ * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
+ * isn't marked to allow that).
+ */
+private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
+ extends AbstractFunction3[T1, T2, T3, R] {
+ @throws(classOf[Exception])
+ def call(t1: T1, t2: T2, t3: T3): R
+
+ final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3)
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 6da2261f06..9ceff754c4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -486,7 +486,7 @@ abstract class DStream[T: ClassManifest] (
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
- * this DStream will be registered as an output stream and therefore materialized.
+ * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: RDD[T] => Unit) {
this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
@@ -494,7 +494,7 @@ abstract class DStream[T: ClassManifest] (
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
- * this DStream will be registered as an output stream and therefore materialized.
+ * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
@@ -504,18 +504,52 @@ abstract class DStream[T: ClassManifest] (
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
- transform((r: RDD[T], t: Time) => transformFunc(r))
+ transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
- new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+ //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+ val cleanedF = context.sparkContext.clean(transformFunc)
+ val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ assert(rdds.length == 1)
+ cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
+ }
+ new TransformedDStream[U](Seq(this), realTransformFunc)
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[U: ClassManifest, V: ClassManifest](
+ other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
+ ): DStream[V] = {
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
+ transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[U: ClassManifest, V: ClassManifest](
+ other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
+ ): DStream[V] = {
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
+ val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ assert(rdds.length == 2)
+ val rdd1 = rdds(0).asInstanceOf[RDD[T]]
+ val rdd2 = rdds(1).asInstanceOf[RDD[U]]
+ cleanedF(rdd1, rdd2, time)
+ }
+ new TransformedDStream[V](Seq(this, other), realTransformFunc)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 757bc98981..8c12fd11ef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
-import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
+import org.apache.spark.streaming.dstream.{ShuffledDStream}
import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
import org.apache.spark.{Partitioner, HashPartitioner}
@@ -359,7 +359,7 @@ extends Serializable {
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
@@ -398,11 +398,18 @@ extends Serializable {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
}
-
+ /**
+ * Return a new DStream by applying a map function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
+ /**
+ * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
def flatMapValues[U: ClassManifest](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
@@ -410,9 +417,8 @@ extends Serializable {
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
@@ -420,56 +426,132 @@ extends Serializable {
}
/**
- * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. Partitioner is used to partition each generated RDD.
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
*/
def cogroup[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Seq[V], Seq[W]))] = {
-
- val cgd = new CoGroupedDStream[K](
- Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]),
- partitioner
- )
- val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
- classManifest[K],
- Manifests.seqSeqManifest
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
)
- pdfs.mapValues {
- case Seq(vs, ws) =>
- (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
- }
}
/**
- * Join `this` DStream with `other` DStream. HashPartitioner is used
- * to partition each generated RDD into default number of partitions.
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}
/**
- * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
- * be generated by joining RDDs from `this` and other DStream. Uses the given
- * Partitioner to partition each generated RDD.
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+ join[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def join[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, W))] = {
- this.cogroup(other, partitioner)
- .flatMapValues{
- case (vs, ws) =>
- for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
- }
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+ leftOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def leftOuterJoin[W: ClassManifest](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (V, Option[W]))] = {
+ leftOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
+ */
+ def leftOuterJoin[W: ClassManifest](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (V, Option[W]))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+ rightOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def rightOuterJoin[W: ClassManifest](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (Option[V], W))] = {
+ rightOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
+ */
+ def rightOuterJoin[W: ClassManifest](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (Option[V], W))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
+ )
}
/**
- * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+ * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
@@ -479,8 +561,8 @@ extends Serializable {
}
/**
- * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+ * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles(
prefix: String,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 09c2f7fd8e..70bf902143 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -478,13 +478,24 @@ class StreamingContext private (
inputStream
}
/**
- * Create a unified DStream from multiple DStreams of the same type and same interval
+ * Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
/**
+ * Create a new DStream in which each RDD is generated by applying a function on RDDs of
+ * the DStreams.
+ */
+ def transform[T: ClassManifest](
+ dstreams: Seq[DStream[_]],
+ transformFunc: (Seq[RDD[_]], Time) => RDD[T]
+ ): DStream[T] = {
+ new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
+ }
+
+ /**
* Register an input stream that will be started (InputDStream.start() called) to get the
* input data.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 459695b7ca..09189eadd8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -24,7 +24,8 @@ import scala.collection.JavaConversions._
import org.apache.spark.streaming._
import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
-import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
import java.util
import org.apache.spark.rdd.RDD
import JavaDStream._
@@ -120,10 +121,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
- def glom(): JavaDStream[JList[T]] =
+ def glom(): JavaDStream[JList[T]] = {
new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+ }
+
- /** Return the StreamingContext associated with this DStream */
+ /** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */
def context(): StreamingContext = dstream.context()
/** Return a new DStream by applying a function to all elements of this DStream. */
@@ -238,7 +241,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
- * this DStream will be registered as an output stream and therefore materialized.
+ * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction[R, Void]) {
dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
@@ -246,7 +249,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
- * this DStream will be registered as an output stream and therefore materialized.
+ * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction2[R, Time, Void]) {
dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
@@ -254,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
@@ -266,7 +269,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
@@ -278,7 +281,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
@@ -293,7 +296,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
@@ -307,6 +310,82 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
}
/**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[U, W](
+ other: JavaDStream[U],
+ transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
+ ): JavaDStream[W] = {
+ implicit val cmu: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cmv: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[U, K2, V2](
+ other: JavaDStream[U],
+ transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
+ ): JavaPairDStream[K2, V2] = {
+ implicit val cmu: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cmk2: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv2: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[K2, V2, W](
+ other: JavaPairDStream[K2, V2],
+ transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
+ ): JavaDStream[W] = {
+ implicit val cmk2: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv2: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ implicit val cmw: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[K2, V2, K3, V3](
+ other: JavaPairDStream[K2, V2],
+ transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
+ ): JavaPairDStream[K3, V3] = {
+ implicit val cmk2: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv2: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ implicit val cmk3: ClassManifest[K3] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]]
+ implicit val cmv3: ClassManifest[V3] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index faf8f36182..c6cd635afa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3}
import org.apache.spark.Partitioner
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
@@ -36,7 +36,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
- implicit val kManifiest: ClassManifest[K],
+ implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
@@ -154,7 +154,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
- * combineByKey for RDDs. Please refer to combineByKey in [[PairRDDFunctions]] for more
+ * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
* information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
@@ -419,7 +419,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
@@ -434,7 +434,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param updateFunc State update function. If `this` function returns None, then
@@ -442,15 +442,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
+ implicit val cm: ClassManifest[S] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
@@ -458,19 +460,30 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
+ implicit val cm: ClassManifest[S] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
}
+
+ /**
+ * Return a new DStream by applying a map function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
dstream.mapValues(f)
}
+ /**
+ * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
@@ -480,9 +493,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
@@ -492,21 +504,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. Partitioner is used to partition each generated RDD.
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def cogroup[W](
+ other: JavaPairDStream[K, W],
+ numPartitions: Int
+ ): JavaPairDStream[K, (JList[V], JList[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.cogroup(other.dstream, numPartitions)
+ .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ }
+
+ /**
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
- : JavaPairDStream[K, (JList[V], JList[W])] = {
+ def cogroup[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.cogroup(other.dstream, partitioner)
- .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
/**
- * Join `this` DStream with `other` DStream. HashPartitioner is used
- * to partition each generated RDD into default number of partitions.
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
@@ -515,18 +542,112 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
- * be generated by joining RDDs from `this` and other DStream. Uses the given
- * Partitioner to partition each generated RDD.
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.join(other.dstream, numPartitions)
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
- def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
- : JavaPairDStream[K, (V, W)] = {
+ def join[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.join(other.dstream, partitioner)
}
/**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.leftOuterJoin(other.dstream)
+ joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def leftOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ numPartitions: Int
+ ): JavaPairDStream[K, (V, Optional[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
+ joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ */
+ def leftOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (V, Optional[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
+ joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.rightOuterJoin(other.dstream)
+ joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def rightOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ numPartitions: Int
+ ): JavaPairDStream[K, (Optional[V], W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
+ joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
+ */
+ def rightOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (Optional[V], W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
+ joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+ }
+
+ /**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
@@ -596,14 +717,19 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
+ /** Convert to a JavaDStream */
+ def toJavaDStream(): JavaDStream[(K, V)] = {
+ new JavaDStream[(K, V)](dstream)
+ }
+
override val classManifest: ClassManifest[(K, V)] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
}
object JavaPairDStream {
- implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)])
- :JavaPairDStream[K, V] =
+ implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) = {
new JavaPairDStream[K, V](dstream)
+ }
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
implicit val cmk: ClassManifest[K] =
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 6423b916b0..cf30b541e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.api.java
import java.lang.{Long => JLong, Integer => JInt}
import java.io.InputStream
-import java.util.{Map => JMap}
+import java.util.{Map => JMap, List => JList}
import scala.collection.JavaConversions._
@@ -33,7 +33,7 @@ import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy}
@@ -594,6 +594,77 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
+ * Create a unified DStream from multiple DStreams of the same type and same slide duration.
+ */
+ def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {
+ val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
+ implicit val cm: ClassManifest[T] = first.classManifest
+ ssc.union(dstreams)(cm)
+ }
+
+ /**
+ * Create a unified DStream from multiple DStreams of the same type and same slide duration.
+ */
+ def union[K, V](
+ first: JavaPairDStream[K, V],
+ rest: JList[JavaPairDStream[K, V]]
+ ): JavaPairDStream[K, V] = {
+ val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
+ implicit val cm: ClassManifest[(K, V)] = first.classManifest
+ implicit val kcm: ClassManifest[K] = first.kManifest
+ implicit val vcm: ClassManifest[V] = first.vManifest
+ new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm)
+ }
+
+ /**
+ * Create a new DStream in which each RDD is generated by applying a function on RDDs of
+ * the DStreams. The order of the JavaRDDs in the transform function parameter will be the
+ * same as the order of corresponding DStreams in the list. Note that for adding a
+ * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
+ * In the transform function, convert the JavaRDD corresponding to that JavaDStream to
+ * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ */
+ def transform[T](
+ dstreams: JList[JavaDStream[_]],
+ transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]]
+ ): JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val scalaDStreams = dstreams.map(_.dstream).toSeq
+ val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
+ transformFunc.call(jrdds, time).rdd
+ }
+ ssc.transform(scalaDStreams, scalaTransformFunc)
+ }
+
+ /**
+ * Create a new DStream in which each RDD is generated by applying a function on RDDs of
+ * the DStreams. The order of the JavaRDDs in the transform function parameter will be the
+ * same as the order of corresponding DStreams in the list. Note that for adding a
+ * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
+ * In the transform function, convert the JavaRDD corresponding to that JavaDStream to
+ * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ */
+ def transform[K, V](
+ dstreams: JList[JavaDStream[_]],
+ transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
+ ): JavaPairDStream[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ val scalaDStreams = dstreams.map(_.dstream).toSeq
+ val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
+ transformFunc.call(jrdds, time).rdd
+ }
+ ssc.transform(scalaDStreams, scalaTransformFunc)
+ }
+
+ /**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
deleted file mode 100644
index 4eddc755b9..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Partitioner
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.CoGroupedRDD
-import org.apache.spark.streaming.{Time, DStream, Duration}
-
-private[streaming]
-class CoGroupedDStream[K : ClassManifest](
- parents: Seq[DStream[(K, _)]],
- partitioner: Partitioner
- ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
-
- if (parents.length == 0) {
- throw new IllegalArgumentException("Empty array of parents")
- }
-
- if (parents.map(_.ssc).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different StreamingContexts")
- }
-
- if (parents.map(_.slideDuration).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different slide times")
- }
-
- override def dependencies = parents.toList
-
- override def slideDuration: Duration = parents.head.slideDuration
-
- override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
- val part = partitioner
- val rdds = parents.flatMap(_.getOrCompute(validTime))
- if (rdds.size > 0) {
- val q = new CoGroupedRDD[K](rdds, part)
- Some(q)
- } else {
- None
- }
- }
-
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 60485adef9..71bcb2b390 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -21,16 +21,22 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, DStream, Time}
private[streaming]
-class TransformedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- transformFunc: (RDD[T], Time) => RDD[U]
- ) extends DStream[U](parent.ssc) {
+class TransformedDStream[U: ClassManifest] (
+ parents: Seq[DStream[_]],
+ transformFunc: (Seq[RDD[_]], Time) => RDD[U]
+ ) extends DStream[U](parents.head.ssc) {
- override def dependencies = List(parent)
+ require(parents.length > 0, "List of DStreams to transform is empty")
+ require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
+ require(parents.map(_.slideDuration).distinct.size == 1,
+ "Some of the DStreams have different slide durations")
- override def slideDuration: Duration = parent.slideDuration
+ override def dependencies = parents.toList
+
+ override def slideDuration: Duration = parents.head.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(transformFunc(_, validTime))
+ val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
+ Some(transformFunc(parentRDDs, validTime))
}
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 9da8adda83..ad4a8b9535 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -256,7 +257,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(mapped);
- List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -325,8 +326,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9));
JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
- JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3));
- JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6));
+ JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1, 2, 3));
+ JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4, 5, 6));
JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9));
LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
@@ -353,17 +354,19 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(9,10,11));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> transformed =
- stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
- @Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
- return in.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer i) throws Exception {
- return i + 2;
- }
- });
- }});
+ JavaDStream<Integer> transformed = stream.transform(
+ new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ @Override
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ return in.map(new Function<Integer, Integer>() {
+ @Override
+ public Integer call(Integer i) throws Exception {
+ return i + 2;
+ }
+ });
+ }
+ });
+
JavaTestUtils.attachTestOutputStream(transformed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -371,6 +374,316 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testVariousTransform() {
+ // tests whether all variations of transform can be called from Java
+
+ List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+ JavaDStream<Integer> transformed1 = stream.transform(
+ new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ @Override
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Integer> transformed2 = stream.transform(
+ new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, Integer> transformed3 = stream.transform(
+ new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, Integer> transformed4 = stream.transform(
+ new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Integer> pairTransformed1 = pairStream.transform(
+ new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Integer> pairTransformed2 = pairStream.transform(
+ new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, String> pairTransformed3 = pairStream.transform(
+ new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, String> pairTransformed4 = pairStream.transform(
+ new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ }
+
+ @Test
+ public void testTransformWith() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(
+ new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
+ pairStream2,
+ new Function3<
+ JavaPairRDD<String, String>,
+ JavaPairRDD<String, String>,
+ Time,
+ JavaPairRDD<String, Tuple2<String, String>>
+ >() {
+ @Override
+ public JavaPairRDD<String, Tuple2<String, String>> call(
+ JavaPairRDD<String, String> rdd1,
+ JavaPairRDD<String, String> rdd2,
+ Time time
+ ) throws Exception {
+ return rdd1.join(rdd2);
+ }
+ }
+ );
+
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+
+ @Test
+ public void testVariousTransformWith() {
+ // tests whether all variations of transformWith can be called from Java
+
+ List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+ List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
+ JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData1 =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ List<List<Tuple2<Double, Character>>> pairInputData2 =
+ Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+ JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+ JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+ JavaDStream<Double> transformed1 = stream1.transformWith(
+ stream2,
+ new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Double> transformed2 = stream1.transformWith(
+ pairStream1,
+ new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> transformed3 = stream1.transformWith(
+ stream2,
+ new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> transformed4 = stream1.transformWith(
+ pairStream1,
+ new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(
+ stream2,
+ new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith(
+ pairStream1,
+ new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith(
+ stream2,
+ new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
+ pairStream2,
+ new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+ }
+
+ @Test
+ public void testStreamingContextTransform(){
+ List<List<Integer>> stream1input = Arrays.asList(
+ Arrays.asList(1),
+ Arrays.asList(2)
+ );
+
+ List<List<Integer>> stream2input = Arrays.asList(
+ Arrays.asList(3),
+ Arrays.asList(4)
+ );
+
+ List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
+ Arrays.asList(new Tuple2<Integer, String>(1, "x")),
+ Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+ );
+
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
+ Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+ );
+
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
+ JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
+ JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
+
+ List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
+
+ // This is just to test whether this transform to JavaStream compiles
+ JavaDStream<Long> transformed1 = ssc.transform(
+ listOfDStreams1,
+ new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
+ public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
+ assert(listOfRDDs.size() == 2);
+ return null;
+ }
+ }
+ );
+
+ List<JavaDStream<?>> listOfDStreams2 =
+ Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
+
+ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform(
+ listOfDStreams2,
+ new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
+ public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) {
+ assert(listOfRDDs.size() == 3);
+ JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
+ JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
+ JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
+ JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+ PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i);
+ }
+ };
+ return rdd1.union(rdd2).map(mapToTuple).join(prdd3);
+ }
+ }
+ );
+ JavaTestUtils.attachTestOutputStream(transformed2);
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("go", "giants"),
@@ -1132,7 +1445,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -1175,7 +1488,38 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
JavaTestUtils.attachTestOutputStream(joined);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testLeftOuterJoin() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks") ));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants") ),
+ Arrays.asList(new Tuple2<String, String>("new york", "islanders") )
+
+ );
+
+ List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = pairStream1.leftOuterJoin(pairStream2);
+ JavaDStream<Long> counted = joined.count();
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 55cfcb371a..259ef1608c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -18,7 +18,10 @@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
-import scala.runtime.RichInt
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+
import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
@@ -181,6 +184,72 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
+ test("union") {
+ val input = Seq(1 to 4, 101 to 104, 201 to 204)
+ val output = Seq(1 to 8, 101 to 108, 201 to 208)
+ testOperation(
+ input,
+ (s: DStream[Int]) => s.union(s.map(_ + 4)) ,
+ output
+ )
+ }
+
+ test("StreamingContext.union") {
+ val input = Seq(1 to 4, 101 to 104, 201 to 204)
+ val output = Seq(1 to 12, 101 to 112, 201 to 212)
+ // union over 3 DStreams
+ testOperation(
+ input,
+ (s: DStream[Int]) => s.context.union(Seq(s, s.map(_ + 4), s.map(_ + 8))),
+ output
+ )
+ }
+
+ test("transform") {
+ val input = Seq(1 to 4, 5 to 8, 9 to 12)
+ testOperation(
+ input,
+ (r: DStream[Int]) => r.transform(rdd => rdd.map(_.toString)), // RDD.map in transform
+ input.map(_.map(_.toString))
+ )
+ }
+
+ test("transformWith") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (1, "x")), ("b", (1, "x")) ),
+ Seq( ("", (1, "x")) ),
+ Seq( ),
+ Seq( )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ val t1 = s1.map(x => (x, 1))
+ val t2 = s2.map(x => (x, "x"))
+ t1.transformWith( // RDD.join in transform
+ t2,
+ (rdd1: RDD[(String, Int)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2)
+ )
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
+ test("StreamingContext.transform") {
+ val input = Seq(1 to 4, 101 to 104, 201 to 204)
+ val output = Seq(1 to 12, 101 to 112, 201 to 212)
+
+ // transform over 3 DStreams by doing union of the 3 RDDs
+ val operation = (s: DStream[Int]) => {
+ s.context.transform(
+ Seq(s, s.map(_ + 4), s.map(_ + 8)), // 3 DStreams
+ (rdds: Seq[RDD[_]], time: Time) =>
+ rdds.head.context.union(rdds.map(_.asInstanceOf[RDD[Int]])) // union of RDDs
+ )
+ }
+
+ testOperation(input, operation, output)
+ }
+
test("cogroup") {
val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() )
@@ -206,7 +275,37 @@ class BasicOperationsSuite extends TestSuiteBase {
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
- s1.map(x => (x,1)).join(s2.map(x => (x,"x")))
+ s1.map(x => (x, 1)).join(s2.map(x => (x, "x")))
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
+ test("leftOuterJoin") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (1, Some("x"))), ("b", (1, Some("x"))) ),
+ Seq( ("", (1, Some("x"))), ("a", (1, None)) ),
+ Seq( ("", (1, None)) ),
+ Seq( )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ s1.map(x => (x, 1)).leftOuterJoin(s2.map(x => (x, "x")))
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
+ test("rightOuterJoin") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (Some(1), "x")), ("b", (Some(1), "x")) ),
+ Seq( ("", (Some(1), "x")), ("b", (None, "x")) ),
+ Seq( ),
+ Seq( ("", (None, "x")) )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ s1.map(x => (x, 1)).rightOuterJoin(s2.map(x => (x, "x")))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}