aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala98
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala64
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala142
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala58
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java219
6 files changed, 421 insertions, 176 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index ee351daa60..38e34795b4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -479,7 +479,7 @@ abstract class DStream[T: ClassManifest] (
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
- * this DStream will be registered as an output stream and therefore materialized.
+ * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: RDD[T] => Unit) {
this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
@@ -487,7 +487,7 @@ abstract class DStream[T: ClassManifest] (
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
- * this DStream will be registered as an output stream and therefore materialized.
+ * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
@@ -497,7 +497,7 @@ abstract class DStream[T: ClassManifest] (
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
@@ -505,7 +505,7 @@ abstract class DStream[T: ClassManifest] (
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
//new TransformedDStream(this, context.sparkContext.clean(transformFunc))
@@ -518,8 +518,8 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Return a new DStream in which each RDD is generated by applying a function on RDDs
- * of DStreams stream1 and stream2.
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
*/
def transformWith[U: ClassManifest, V: ClassManifest](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
@@ -529,8 +529,8 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Return a new DStream in which each RDD is generated by applying a function on RDDs
- * of DStreams stream1 and stream2.
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
*/
def transformWith[U: ClassManifest, V: ClassManifest](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index c319433e54..8c12fd11ef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
-import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
+import org.apache.spark.streaming.dstream.{ShuffledDStream}
import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
import org.apache.spark.{Partitioner, HashPartitioner}
@@ -359,7 +359,7 @@ extends Serializable {
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
@@ -398,11 +398,18 @@ extends Serializable {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
}
-
+ /**
+ * Return a new DStream by applying a map function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
+ /**
+ * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
def flatMapValues[U: ClassManifest](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
@@ -410,9 +417,8 @@ extends Serializable {
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
@@ -420,31 +426,29 @@ extends Serializable {
}
/**
- * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. Partitioner is used to partition each generated RDD.
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
*/
def cogroup[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Seq[V], Seq[W]))] = {
-
- val cgd = new CoGroupedDStream[K](
- Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]),
- partitioner
- )
- val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
- classManifest[K],
- Manifests.seqSeqManifest
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
)
- pdfs.mapValues {
- case Seq(vs, ws) =>
- (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
- }
}
/**
- * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream..
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
@@ -452,7 +456,15 @@ extends Serializable {
}
/**
- * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+ join[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def join[W: ClassManifest](
@@ -466,7 +478,7 @@ extends Serializable {
}
/**
- * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
@@ -475,7 +487,19 @@ extends Serializable {
}
/**
- * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def leftOuterJoin[W: ClassManifest](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (V, Option[W]))] = {
+ leftOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
@@ -490,7 +514,7 @@ extends Serializable {
}
/**
- * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
@@ -499,7 +523,19 @@ extends Serializable {
}
/**
- * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def rightOuterJoin[W: ClassManifest](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (Option[V], W))] = {
+ rightOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
@@ -514,8 +550,8 @@ extends Serializable {
}
/**
- * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+ * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
@@ -525,8 +561,8 @@ extends Serializable {
}
/**
- * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+ * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles(
prefix: String,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 1110d770c4..09189eadd8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -121,10 +121,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
- def glom(): JavaDStream[JList[T]] =
+ def glom(): JavaDStream[JList[T]] = {
new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+ }
+
- /** Return the StreamingContext associated with this DStream */
+ /** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */
def context(): StreamingContext = dstream.context()
/** Return a new DStream by applying a function to all elements of this DStream. */
@@ -239,7 +241,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
- * this DStream will be registered as an output stream and therefore materialized.
+ * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction[R, Void]) {
dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
@@ -247,7 +249,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
- * this DStream will be registered as an output stream and therefore materialized.
+ * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction2[R, Time, Void]) {
dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
@@ -255,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
@@ -267,7 +269,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
@@ -279,7 +281,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
@@ -294,7 +296,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
@@ -309,7 +311,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this and other DStreams.
+ * on each RDD of 'this' DStream and 'other' DStream.
*/
def transformWith[U, W](
other: JavaDStream[U],
@@ -326,7 +328,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this and other DStreams.
+ * on each RDD of 'this' DStream and 'other' DStream.
*/
def transformWith[U, K2, V2](
other: JavaDStream[U],
@@ -345,42 +347,42 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this and other DStreams.
+ * on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[K, V, W](
- other: JavaPairDStream[K, V],
- transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaRDD[W]]
+ def transformWith[K2, V2, W](
+ other: JavaPairDStream[K2, V2],
+ transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
): JavaDStream[W] = {
- implicit val cmk: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val cmv: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmk2: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv2: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
implicit val cmw: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
- def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[W] =
+ def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
- dstream.transformWith[(K, V), W](other.dstream, scalaTransform(_, _, _))
+ dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this and other DStreams.
+ * on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[K, V, K2, V2](
- other: JavaPairDStream[K, V],
- transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]
- ): JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val cmv: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ def transformWith[K2, V2, K3, V3](
+ other: JavaPairDStream[K2, V2],
+ transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
+ ): JavaPairDStream[K3, V3] = {
implicit val cmk2: ClassManifest[K2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
implicit val cmv2: ClassManifest[V2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
- def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
+ implicit val cmk3: ClassManifest[K3] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]]
+ implicit val cmv3: ClassManifest[V3] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
- dstream.transformWith[(K, V), (K2, V2)](other.dstream, scalaTransform(_, _, _))
+ dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 821db46fff..309c0fa24b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3}
import org.apache.spark.Partitioner
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
@@ -148,7 +148,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
- * combineByKey for RDDs. Please refer to combineByKey in [[PairRDDFunctions]] for more
+ * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
* information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
@@ -413,7 +413,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
@@ -428,7 +428,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param updateFunc State update function. If `this` function returns None, then
@@ -436,15 +436,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
+ implicit val cm: ClassManifest[S] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
@@ -452,19 +454,30 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
- def updateStateByKey[S: ClassManifest](
+ def updateStateByKey[S](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
+ implicit val cm: ClassManifest[S] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
}
+
+ /**
+ * Return a new DStream by applying a map function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
dstream.mapValues(f)
}
+ /**
+ * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
@@ -474,9 +487,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
@@ -486,20 +498,35 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. Partitioner is used to partition each generated RDD.
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
- : JavaPairDStream[K, (JList[V], JList[W])] = {
+ def cogroup[W](
+ other: JavaPairDStream[K, W],
+ numPartitions: Int
+ ): JavaPairDStream[K, (JList[V], JList[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.cogroup(other.dstream, numPartitions)
+ .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ }
+
+ /**
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def cogroup[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.cogroup(other.dstream, partitioner)
- .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
/**
- * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream..
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
@@ -509,19 +536,32 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.join(other.dstream, numPartitions)
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
- def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
- : JavaPairDStream[K, (V, W)] = {
+ def join[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.join(other.dstream, partitioner)
}
/**
- * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream..
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
*/
def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
implicit val cm: ClassManifest[W] =
@@ -531,11 +571,28 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def leftOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ numPartitions: Int
+ ): JavaPairDStream[K, (V, Optional[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
+ joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
- def leftOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
- : JavaPairDStream[K, (V, Optional[W])] = {
+ def leftOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (V, Optional[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
@@ -543,8 +600,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream..
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
*/
def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
implicit val cm: ClassManifest[W] =
@@ -554,11 +612,29 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def rightOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ numPartitions: Int
+ ): JavaPairDStream[K, (Optional[V], W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
+ joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
*/
- def rightOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
- : JavaPairDStream[K, (Optional[V], W)] = {
+ def rightOuterJoin[W](
+ other: JavaPairDStream[K, W],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, (Optional[V], W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
@@ -640,9 +716,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
object JavaPairDStream {
- implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)])
- :JavaPairDStream[K, V] =
+ implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) = {
new JavaPairDStream[K, V](dstream)
+ }
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
implicit val cmk: ClassManifest[K] =
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
deleted file mode 100644
index 4eddc755b9..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Partitioner
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.CoGroupedRDD
-import org.apache.spark.streaming.{Time, DStream, Duration}
-
-private[streaming]
-class CoGroupedDStream[K : ClassManifest](
- parents: Seq[DStream[(K, _)]],
- partitioner: Partitioner
- ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
-
- if (parents.length == 0) {
- throw new IllegalArgumentException("Empty array of parents")
- }
-
- if (parents.map(_.ssc).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different StreamingContexts")
- }
-
- if (parents.map(_.slideDuration).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different slide times")
- }
-
- override def dependencies = parents.toList
-
- override def slideDuration: Duration = parents.head.slideDuration
-
- override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
- val part = partitioner
- val rdds = parents.flatMap(_.getOrCompute(validTime))
- if (rdds.size > 0) {
- val q = new CoGroupedRDD[K](rdds, part)
- Some(q)
- } else {
- None
- }
- }
-
-}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 9f885f07f2..16622a3459 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -320,17 +320,20 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(9,10,11));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> transformed =
- stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
- @Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
- return in.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer i) throws Exception {
- return i + 2;
- }
- });
- }});
+ JavaDStream<Integer> transformed = stream.transform(
+ new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ @Override
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ return in.map(new Function<Integer, Integer>() {
+ @Override
+ public Integer call(Integer i) throws Exception {
+ return i + 2;
+ }
+ });
+ }
+ }
+ );
+
JavaTestUtils.attachTestOutputStream(transformed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -338,6 +341,84 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testVariousTransform() {
+ // tests whether all variations of transform can be called from Java
+
+ List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+ JavaDStream<Integer> transformed1 = stream.transform(
+ new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Integer> transformed2 = stream.transform(
+ new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, Integer> transformed3 = stream.transform(
+ new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, Integer> transformed4 = stream.transform(
+ new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Integer> pairTransformed1 = pairStream.transform(
+ new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Integer> pairTransformed2 = pairStream.transform(
+ new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, String> pairTransformed3 = pairStream.transform(
+ new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, String> pairTransformed4 = pairStream.transform(
+ new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ }
+
+ @Test
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
@@ -374,10 +455,18 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
pairStream2,
- new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() {
- @Override
- public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception {
- return stringStringJavaPairRDD.join(stringStringJavaPairRDD2);
+ new Function3 <
+ JavaPairRDD<String, String>,
+ JavaPairRDD<String, String>,
+ Time,
+ JavaPairRDD<String, Tuple2<String, String>>
+ >() {
+ @Override public JavaPairRDD<String, Tuple2<String, String>> call(
+ JavaPairRDD<String, String> rdd1,
+ JavaPairRDD<String, String> rdd2,
+ Time time
+ ) throws Exception {
+ return rdd1.join(rdd2);
}
}
);
@@ -389,6 +478,106 @@ public class JavaAPISuite implements Serializable {
}
+ @Test
+ public void testVariousTransformWith() {
+ // tests whether all variations of transformWith can be called from Java
+
+ List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+ List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
+ JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData1 =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ List<List<Tuple2<Double, Character>>> pairInputData2 =
+ Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+ JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+ JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+ JavaDStream<Double> transformed1 = stream1.transformWith(
+ stream2,
+ new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Double> transformed2 = stream1.transformWith(
+ pairStream1,
+ new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> transformed3 = stream1.transformWith(
+ stream2,
+ new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> transformed4 = stream1.transformWith(
+ pairStream1,
+ new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(
+ stream2,
+ new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith(
+ pairStream1,
+ new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith(
+ stream2,
+ new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+
+ JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
+ pairStream2,
+ new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+ }
+
@Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(