aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-07 11:02:03 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit6e514a8d3511891a3f7221c594171477a0b5a38f (patch)
tree35b51d5f9dc195bca60d2f07f88724ee314b21cd /streaming
parentf144e0413a1e42d193a86fa04af769e2da9dc58b (diff)
downloadspark-6e514a8d3511891a3f7221c594171477a0b5a38f.tar.gz
spark-6e514a8d3511891a3f7221c594171477a0b5a38f.tar.bz2
spark-6e514a8d3511891a3f7221c594171477a0b5a38f.zip
PairDStream and DStreamLike
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala102
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala109
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala134
-rw-r--r--streaming/src/test/scala/JavaTestUtils.scala6
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java108
5 files changed, 359 insertions, 100 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 56e54c719a..9e2823d81f 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -1,109 +1,17 @@
package spark.streaming.api.java
-import java.util.{List => JList}
+import spark.streaming.DStream
+import spark.api.java.function.{Function => JFunction}
-import scala.collection.JavaConversions._
-
-import spark.streaming._
-import spark.api.java.JavaRDD
-import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
-import java.util
-import spark.RDD
-
-class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) {
- def print() = dstream.print()
-
- // TODO move to type-specific implementations
- def cache() : JavaDStream[T] = {
- dstream.cache()
- }
-
- def count() : JavaDStream[Int] = {
- dstream.count()
- }
-
- def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = {
- dstream.countByWindow(windowTime, slideTime)
- }
-
- def compute(validTime: Time): JavaRDD[T] = {
- dstream.compute(validTime) match {
- case Some(rdd) => new JavaRDD(rdd)
- case None => null
- }
- }
-
- def context(): StreamingContext = dstream.context()
-
- def window(windowTime: Time): JavaDStream[T] = {
- dstream.window(windowTime)
- }
-
- def window(windowTime: Time, slideTime: Time): JavaDStream[T] = {
- dstream.window(windowTime, slideTime)
- }
-
- def tumble(batchTime: Time): JavaDStream[T] = {
- dstream.tumble(batchTime)
- }
-
- def map[R](f: JFunction[T, R]): JavaDStream[R] = {
- new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
- }
+class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
+ extends JavaDStreamLike[T, JavaDStream[T]] {
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = {
dstream.filter((x => f(x).booleanValue()))
}
-
- def glom(): JavaDStream[JList[T]] = {
- new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
- }
-
- // TODO: Other map partitions
- def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
- }
-
- def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
-
- def reduceByWindow(
- reduceFunc: JFunction2[T, T, T],
- invReduceFunc: JFunction2[T, T, T],
- windowTime: Time,
- slideTime: Time): JavaDStream[T] = {
- dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
- }
-
- def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = {
- new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq)
- }
-
- def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = {
- dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
- }
-
- def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = {
- dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
- }
-
- def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
- def scalaTransform (in: RDD[T]): RDD[U] = {
- transformFunc.call(new JavaRDD[T](in)).rdd
- }
- dstream.transform(scalaTransform(_))
- }
- // TODO: transform with time
-
- def union(that: JavaDStream[T]): JavaDStream[T] = {
- dstream.union(that.dstream)
- }
}
object JavaDStream {
implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] =
new JavaDStream[T](dstream)
-
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
new file mode 100644
index 0000000000..daea56f50c
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -0,0 +1,109 @@
+package spark.streaming.api.java
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+import spark.streaming._
+import spark.api.java.JavaRDD
+import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
+import java.util
+import spark.RDD
+import JavaDStream._
+
+trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable {
+ implicit val classManifest: ClassManifest[T]
+
+ def dstream: DStream[T]
+
+ def print() = dstream.print()
+
+ // TODO move to type-specific implementations
+ def cache() : JavaDStream[T] = {
+ dstream.cache()
+ }
+
+ def count() : JavaDStream[Int] = {
+ dstream.count()
+ }
+
+ def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = {
+ dstream.countByWindow(windowTime, slideTime)
+ }
+
+ def compute(validTime: Time): JavaRDD[T] = {
+ dstream.compute(validTime) match {
+ case Some(rdd) => new JavaRDD(rdd)
+ case None => null
+ }
+ }
+
+ def context(): StreamingContext = dstream.context()
+
+ def window(windowTime: Time): JavaDStream[T] = {
+ dstream.window(windowTime)
+ }
+
+ def window(windowTime: Time, slideTime: Time): JavaDStream[T] = {
+ dstream.window(windowTime, slideTime)
+ }
+
+ def tumble(batchTime: Time): JavaDStream[T] = {
+ dstream.tumble(batchTime)
+ }
+
+ def map[R](f: JFunction[T, R]): JavaDStream[R] = {
+ new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
+ }
+
+ def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
+ }
+
+ def glom(): JavaDStream[JList[T]] = {
+ new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+ }
+
+ // TODO: Other map partitions
+ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
+ }
+
+ def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
+
+ def reduceByWindow(
+ reduceFunc: JFunction2[T, T, T],
+ invReduceFunc: JFunction2[T, T, T],
+ windowTime: Time,
+ slideTime: Time): JavaDStream[T] = {
+ dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
+ }
+
+ def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = {
+ new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq)
+ }
+
+ def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = {
+ dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
+ }
+
+ def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = {
+ dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
+ }
+
+ def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ def scalaTransform (in: RDD[T]): RDD[U] = {
+ transformFunc.call(new JavaRDD[T](in)).rdd
+ }
+ dstream.transform(scalaTransform(_))
+ }
+ // TODO: transform with time
+
+ def union(that: JavaDStream[T]): JavaDStream[T] = {
+ dstream.union(that.dstream)
+ }
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
new file mode 100644
index 0000000000..01dda24fde
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -0,0 +1,134 @@
+package spark.streaming.api.java
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+import spark.streaming._
+import spark.streaming.StreamingContext._
+import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import spark.Partitioner
+
+class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
+ implicit val kManifiest: ClassManifest[K],
+ implicit val vManifest: ClassManifest[V])
+ extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
+
+ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = {
+ dstream.filter((x => f(x).booleanValue()))
+ }
+
+ def groupByKey(): JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKey().mapValues(seqAsJavaList _)
+ }
+
+ def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
+ }
+
+ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
+ }
+
+ def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = {
+ dstream.reduceByKey(func)
+ }
+
+ def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = {
+ dstream.reduceByKey(func, numPartitions)
+ }
+
+ // TODO: TEST BELOW
+ def combineByKey[C](createCombiner: Function[V, C],
+ mergeValue: JFunction2[C, V, C],
+ mergeCombiners: JFunction2[C, C, C],
+ partitioner: Partitioner): JavaPairDStream[K, C] = {
+ implicit val cm: ClassManifest[C] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
+ dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
+ }
+
+ def countByKey(numPartitions: Int): JavaPairDStream[K, Long] = {
+ dstream.countByKey(numPartitions);
+ }
+
+ def countByKey(): JavaPairDStream[K, Long] = {
+ dstream.countByKey();
+ }
+
+ def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowTime, slideTime).mapValues(seqAsJavaList _)
+ }
+
+ def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int):
+ JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowTime, slideTime, numPartitions).mapValues(seqAsJavaList _)
+ }
+
+ def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner):
+ JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowTime, slideTime, partitioner).mapValues(seqAsJavaList _)
+ }
+
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time):
+ JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowTime)
+ }
+
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time):
+ JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime)
+ }
+
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time,
+ numPartitions: Int): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, numPartitions)
+ }
+
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time,
+ partitioner: Partitioner): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, partitioner)
+ }
+
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
+ windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
+ }
+
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
+ windowTime: Time, slideTime: Time, numPartitions: Int): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, numPartitions)
+ }
+
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
+ windowTime: Time, slideTime: Time, partitioner: Partitioner)
+ : JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner)
+ }
+
+ def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, Long] = {
+ dstream.countByKeyAndWindow(windowTime, slideTime)
+ }
+
+ def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int)
+ : JavaPairDStream[K, Long] = {
+ dstream.countByKeyAndWindow(windowTime, slideTime, numPartitions)
+ }
+
+ override val classManifest: ClassManifest[(K, V)] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+}
+
+object JavaPairDStream {
+ implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]):
+ JavaPairDStream[K, V] =
+ new JavaPairDStream[K, V](dstream)
+
+ def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ new JavaPairDStream[K, V](dstream.dstream)
+ }
+}
diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala
index 776b0e6bb6..9f3a80df8b 100644
--- a/streaming/src/test/scala/JavaTestUtils.scala
+++ b/streaming/src/test/scala/JavaTestUtils.scala
@@ -2,7 +2,7 @@ package spark.streaming
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import java.util.{List => JList}
-import spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
import spark.streaming._
import java.util.ArrayList
import collection.JavaConversions._
@@ -20,7 +20,8 @@ object JavaTestUtils extends TestSuiteBase {
new JavaDStream[T](dstream)
}
- def attachTestOutputStream[T](dstream: JavaDStream[T]) = {
+ def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]
+ (dstream: JavaDStreamLike[T, This]) = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val ostream = new TestOutputStream(dstream.dstream,
@@ -37,6 +38,5 @@ object JavaTestUtils extends TestSuiteBase {
res.map(entry => out.append(new ArrayList[V](entry)))
out
}
-
}
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index c4629c8d97..c1373e6275 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -5,12 +5,15 @@ import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import scala.Tuple2;
import spark.api.java.JavaRDD;
import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function;
import spark.api.java.function.Function2;
+import spark.api.java.function.PairFunction;
import spark.streaming.JavaTestUtils;
import spark.streaming.api.java.JavaDStream;
+import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
import java.io.Serializable;
@@ -340,4 +343,109 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, actual);
}
+
+ // PairDStream Functions
+ @Test
+ public void testPairFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
+ Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = stream.map(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2 call(String in) throws Exception {
+ return new Tuple2<String, Integer>(in, in.length());
+ }
+ });
+
+ JavaPairDStream<String, Integer> filtered = pairStream.filter(
+ new Function<Tuple2<String, Integer>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, Integer> in) throws Exception {
+ return in._1().contains("a");
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairGroupByKey() {
+ List<List<Tuple2<String, String>>> inputData = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "yankees"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "rangers"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey();
+ JavaTestUtils.attachTestOutputStream(grouped);
+ List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairReduceByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 1),
+ new Tuple2<String, Integer>("california", 3),
+ new Tuple2<String, Integer>("new york", 4),
+ new Tuple2<String, Integer>("new york", 1)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("new york", 3),
+ new Tuple2<String, Integer>("new york", 1)));
+
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
}