From 560c312c6060914c9c38cb98d3587685f10f7311 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 9 Jan 2013 19:27:32 -0800 Subject: Docs, some tests, and work on StreamingContext --- .../spark/streaming/api/java/JavaDStream.scala | 33 ++ .../spark/streaming/api/java/JavaDStreamLike.scala | 59 ++++ .../spark/streaming/api/java/JavaPairDStream.scala | 46 ++- .../streaming/api/java/JavaStreamingContext.scala | 152 ++++++++- .../test/scala/spark/streaming/JavaAPISuite.java | 359 +++++++++++++++++++-- 5 files changed, 617 insertions(+), 32 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 9bf595e0bc..1e5c279e2c 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -4,15 +4,25 @@ import spark.streaming.{Time, DStream} import spark.api.java.function.{Function => JFunction} import spark.api.java.JavaRDD import java.util.{List => JList} +import spark.storage.StorageLevel class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) extends JavaDStreamLike[T, JavaDStream[T]] { + /** Returns a new DStream containing only the elements that satisfy a predicate. */ def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = dstream.filter((x => f(x).booleanValue())) + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): JavaDStream[T] = dstream.cache() + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): JavaDStream[T] = dstream.cache() + + /** Persists the RDDs of this DStream with the given storage level */ + def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel) + + /** Method that generates a RDD for the given time */ def compute(validTime: Time): JavaRDD[T] = { dstream.compute(validTime) match { case Some(rdd) => new JavaRDD(rdd) @@ -20,15 +30,38 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM } } + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * The new DStream generates RDDs with the same interval as this DStream. + * @param windowTime width of the window; must be a multiple of this DStream's interval. + * @return + */ def window(windowTime: Time): JavaDStream[T] = dstream.window(windowTime) + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * @param windowTime duration (i.e., width) of the window; + * must be a multiple of this DStream's interval + * @param slideTime sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's interval + */ def window(windowTime: Time, slideTime: Time): JavaDStream[T] = dstream.window(windowTime, slideTime) + /** + * Returns a new DStream which computed based on tumbling window on this DStream. + * This is equivalent to window(batchTime, batchTime). + * @param batchTime tumbling window duration; must be a multiple of this DStream's interval + */ def tumble(batchTime: Time): JavaDStream[T] = dstream.tumble(batchTime) + /** + * Returns a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same interval (i.e., slideTime) as this DStream. + */ def union(that: JavaDStream[T]): JavaDStream[T] = dstream.union(that.dstream) } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 05d89918b2..23a0aaaefd 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -16,41 +16,81 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable def dstream: DStream[T] + /** + * Prints the first ten elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ def print() = dstream.print() + /** + * Returns a new DStream in which each RDD has a single element generated by counting each RDD + * of this DStream. + */ def count(): JavaDStream[Int] = dstream.count() + /** + * Returns a new DStream in which each RDD has a single element generated by counting the number + * of elements in a window over this DStream. windowTime and slideTime are as defined in the + * window() operation. This is equivalent to window(windowTime, slideTime).count() + */ def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { dstream.countByWindow(windowTime, slideTime) } + /** + * Return a new DStream in which each RDD is generated by applying glom() to each RDD of + * this DStream. Applying glom() to an RDD coalesces all elements within each partition into + * an array. + */ def glom(): JavaDStream[JList[T]] = new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + /** Returns the StreamingContext associated with this DStream */ def context(): StreamingContext = dstream.context() + /** Returns a new DStream by applying a function to all elements of this DStream. */ def map[R](f: JFunction[T, R]): JavaDStream[R] = { new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) } + /** Returns a new DStream by applying a function to all elements of this DStream. */ def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = { def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) } + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]) : JavaPairDStream[K, V] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) } + /** + * Returns a new DStream in which each RDD has a single element generated by reducing each RDD + * of this DStream. + */ def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) + /** + * Returns a new DStream in which each RDD has a single element generated by reducing all + * elements in a window over this DStream. windowTime and slideTime are as defined in the + * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc) + */ def reduceByWindow( reduceFunc: JFunction2[T, T, T], invReduceFunc: JFunction2[T, T, T], @@ -59,18 +99,33 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime) } + /** + * Returns all the RDDs between 'fromTime' to 'toTime' (both included) + */ def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = { new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) } + /** + * Applies a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) { dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) } + /** + * Applies a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) { dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) } + /** + * Returns a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] @@ -79,6 +134,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable dstream.transform(scalaTransform(_)) } + /** + * Returns a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index f6dfbb2345..f36b870046 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -12,18 +12,31 @@ import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.conf.Configuration import spark.api.java.{JavaPairRDD, JavaRDD} +import spark.storage.StorageLevel class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifiest: ClassManifest[K], implicit val vManifest: ClassManifest[V]) extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] { - // Common to all DStream's + // ======================================================================= + // Methods common to all DStream's + // ======================================================================= + + /** Returns a new DStream containing only the elements that satisfy a predicate. */ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = dstream.filter((x => f(x).booleanValue())) + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): JavaPairDStream[K, V] = dstream.cache() + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): JavaPairDStream[K, V] = dstream.cache() + + /** Persists the RDDs of this DStream with the given storage level */ + def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) + + /** Method that generates a RDD for the given time */ def compute(validTime: Time): JavaPairRDD[K, V] = { dstream.compute(validTime) match { case Some(rdd) => new JavaPairRDD(rdd) @@ -31,19 +44,45 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } } + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * The new DStream generates RDDs with the same interval as this DStream. + * @param windowTime width of the window; must be a multiple of this DStream's interval. + * @return + */ def window(windowTime: Time): JavaPairDStream[K, V] = dstream.window(windowTime) + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * @param windowTime duration (i.e., width) of the window; + * must be a multiple of this DStream's interval + * @param slideTime sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's interval + */ def window(windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = dstream.window(windowTime, slideTime) + /** + * Returns a new DStream which computed based on tumbling window on this DStream. + * This is equivalent to window(batchTime, batchTime). + * @param batchTime tumbling window duration; must be a multiple of this DStream's interval + */ def tumble(batchTime: Time): JavaPairDStream[K, V] = dstream.tumble(batchTime) + /** + * Returns a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same interval (i.e., slideTime) as this DStream. + */ def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = dstream.union(that.dstream) - // Only for PairDStreams... + // ======================================================================= + // Methods only for PairDStream's + // ======================================================================= + def groupByKey(): JavaPairDStream[K, JList[V]] = dstream.groupByKey().mapValues(seqAsJavaList _) @@ -59,8 +98,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = dstream.reduceByKey(func, numPartitions) - // TODO: TEST BELOW - def combineByKey[C](createCombiner: Function[V, C], + def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner): JavaPairDStream[K, C] = { diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 19cd032fc1..f96b4fbd7d 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -4,27 +4,173 @@ import scala.collection.JavaConversions._ import java.util.{List => JList} import spark.streaming._ -import dstream.SparkFlumeEvent +import dstream._ import spark.storage.StorageLevel +import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import java.io.InputStream class JavaStreamingContext(val ssc: StreamingContext) { def this(master: String, frameworkName: String, batchDuration: Time) = this(new StreamingContext(master, frameworkName, batchDuration)) - def textFileStream(directory: String): JavaDStream[String] = { - ssc.textFileStream(directory) + // TODOs: + // - Test StreamingContext functions + // - Test to/from Hadoop functions + // - Add checkpoint()/remember() + // - Support creating your own streams + // - Add Kafka Stream + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel) + : JavaDStream[String] = { + ssc.networkTextStream(hostname, port, storageLevel) } + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + */ def networkTextStream(hostname: String, port: Int): JavaDStream[String] = { ssc.networkTextStream(hostname, port) } + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes it interepreted as object using the given + * converter. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param converter Function to convert the byte stream to objects + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects received (after converting bytes to objects) + */ + def networkStream[T]( + hostname: String, + port: Int, + converter: JFunction[InputStream, java.lang.Iterable[T]], + storageLevel: StorageLevel) + : JavaDStream[T] = { + import scala.collection.JavaConverters._ + def fn = (x: InputStream) => converter.apply(x).toIterator + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.networkStream(hostname, port, fn, storageLevel) + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + */ + def textFileStream(directory: String): JavaDStream[String] = { + ssc.textFileStream(directory) + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects in the received blocks + */ + def rawNetworkStream[T]( + hostname: String, + port: Int, + storageLevel: StorageLevel): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel)) + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @tparam T Type of the objects in the received blocks + */ + def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port)) + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmf: ClassManifest[F] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]] + ssc.fileStream[K, V, F](directory); + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): JavaDStream[SparkFlumeEvent] = { ssc.flumeStream(hostname, port, storageLevel) } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def flumeStream(hostname: String, port: Int): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port) + } + + // NOT SUPPORTED: registerInputStream + + /** + * Registers an output stream that will be computed every interval + */ + def registerOutputStream(outputStream: JavaDStreamLike[_, _]) { + ssc.registerOutputStream(outputStream.dstream) + } + + /** + * Starts the execution of the streams. + */ def start() = ssc.start() + + /** + * Sstops the execution of the streams. + */ def stop() = ssc.stop() } diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index c1373e6275..fa3a5801dd 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -6,6 +6,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import scala.Tuple2; +import spark.HashPartitioner; import spark.api.java.JavaRDD; import spark.api.java.function.FlatMapFunction; import spark.api.java.function.Function; @@ -377,18 +378,31 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + List>> stringStringKVStream = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("california", "giants"), + new Tuple2("new york", "yankees"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("california", "ducks"), + new Tuple2("new york", "rangers"), + new Tuple2("new york", "islanders"))); + + List>> stringIntKVStream = Arrays.asList( + Arrays.asList( + new Tuple2("california", 1), + new Tuple2("california", 3), + new Tuple2("new york", 4), + new Tuple2("new york", 1)), + Arrays.asList( + new Tuple2("california", 5), + new Tuple2("california", 5), + new Tuple2("new york", 3), + new Tuple2("new york", 1))); + @Test public void testPairGroupByKey() { - List>> inputData = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers"), - new Tuple2("california", "giants"), - new Tuple2("new york", "yankees"), - new Tuple2("new york", "mets")), - Arrays.asList(new Tuple2("california", "sharks"), - new Tuple2("california", "ducks"), - new Tuple2("new york", "rangers"), - new Tuple2("new york", "islanders"))); - + List>> inputData = stringStringKVStream; List>>> expected = Arrays.asList( Arrays.asList( @@ -410,18 +424,31 @@ public class JavaAPISuite implements Serializable { @Test public void testPairReduceByKey() { - List>> inputData = Arrays.asList( + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( Arrays.asList( - new Tuple2("california", 1), - new Tuple2("california", 3), - new Tuple2("new york", 4), - new Tuple2("new york", 1)), + new Tuple2("california", 4), + new Tuple2("new york", 5)), Arrays.asList( - new Tuple2("california", 5), - new Tuple2("california", 5), - new Tuple2("new york", 3), - new Tuple2("new york", 1))); + new Tuple2("california", 10), + new Tuple2("new york", 4))); + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream reduced = pairStream.reduceByKey(new IntegerSum()); + + JavaTestUtils.attachTestOutputStream(reduced); + List>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCombineByKey() { + List>> inputData = stringIntKVStream; List>> expected = Arrays.asList( Arrays.asList( @@ -435,17 +462,299 @@ public class JavaAPISuite implements Serializable { sc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream reduced = pairStream.reduceByKey( - new Function2() { + JavaPairDStream combined = pairStream.combineByKey( + new Function() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { - return i1 + i2; + public Integer call(Integer i) throws Exception { + return i; } - }); + }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); - JavaTestUtils.attachTestOutputStream(reduced); + JavaTestUtils.attachTestOutputStream(combined); List>> result = JavaTestUtils.runStreams(sc, 2, 2); Assert.assertEquals(expected, result); } + + @Test + public void testCountByKey() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L)), + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + // TODO: Below fails with compile error with ... wtf? + JavaPairDStream counted = pairStream.countByKey(); + JavaTestUtils.attachTestOutputStream(counted); + List>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testGroupByKeyAndWindow() { + List>> inputData = stringStringKVStream; + + List>>> expected = Arrays.asList( + Arrays.asList(new Tuple2>("california", Arrays.asList("dodgers", "giants")), + new Tuple2>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList(new Tuple2>("california", + Arrays.asList("sharks", "ducks", "dodgers", "giants")), + new Tuple2>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), + Arrays.asList(new Tuple2>("california", Arrays.asList("sharks", "ducks")), + new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream> groupWindowed = + pairStream.groupByKeyAndWindow(new Time(2000), new Time(1000)); + JavaTestUtils.attachTestOutputStream(groupWindowed); + List>>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindow() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9)), + Arrays.asList(new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new Time(2000), new Time(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindowWithInverse() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9)), + Arrays.asList(new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Time(2000), new Time(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKeyAndWindow() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L)), + Arrays.asList( + new Tuple2("california", 4L), + new Tuple2("new york", 4L)), + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + // TODO: Below fails with compile error with ... wtf? + JavaPairDStream counted = pairStream.countByKeyAndWindow(new Time(2000), new Time(1000)); + JavaTestUtils.attachTestOutputStream(counted); + List>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapValues() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", "DODGERS"), + new Tuple2("california", "GIANTS"), + new Tuple2("new york", "YANKEES"), + new Tuple2("new york", "METS")), + Arrays.asList(new Tuple2("california", "SHARKS"), + new Tuple2("california", "DUCKS"), + new Tuple2("new york", "RANGERS"), + new Tuple2("new york", "ISLANDERS"))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream mapped = pairStream.mapValues(new Function() { + @Override + public String call(String s) throws Exception { + return s.toUpperCase(); + } + }); + + JavaTestUtils.attachTestOutputStream(mapped); + List>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testFlatMapValues() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers1"), + new Tuple2("california", "dodgers2"), + new Tuple2("california", "giants1"), + new Tuple2("california", "giants2"), + new Tuple2("new york", "yankees1"), + new Tuple2("new york", "yankees2"), + new Tuple2("new york", "mets1"), + new Tuple2("new york", "mets2")), + Arrays.asList(new Tuple2("california", "sharks1"), + new Tuple2("california", "sharks2"), + new Tuple2("california", "ducks1"), + new Tuple2("california", "ducks2"), + new Tuple2("new york", "rangers1"), + new Tuple2("new york", "rangers2"), + new Tuple2("new york", "islanders1"), + new Tuple2("new york", "islanders2"))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + + JavaPairDStream flatMapped = pairStream.flatMapValues( + new Function>() { + @Override + public Iterable call(String in) { + List out = new ArrayList(); + out.add(in + "1"); + out.add(in + "2"); + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(flatMapped); + List>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCoGroup() { + List>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("new york", "rangers"))); + + List>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2("california", "giants"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "ducks"), + new Tuple2("new york", "islanders"))); + + + List, List>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2, List>>("california", + new Tuple2, List>(Arrays.asList("dodgers"), Arrays.asList("giants"))), + new Tuple2, List>>("new york", + new Tuple2, List>(Arrays.asList("yankees"), Arrays.asList("mets")))), + Arrays.asList( + new Tuple2, List>>("california", + new Tuple2, List>(Arrays.asList("sharks"), Arrays.asList("ducks"))), + new Tuple2, List>>("new york", + new Tuple2, List>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); + + + JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( + sc, stringStringKVStream1, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( + sc, stringStringKVStream2, 1); + JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream, List>> grouped = pairStream1.cogroup(pairStream2); + JavaTestUtils.attachTestOutputStream(grouped); + List>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testJoin() { + List>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("new york", "rangers"))); + + List>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2("california", "giants"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "ducks"), + new Tuple2("new york", "islanders"))); + + + List>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2>("california", + new Tuple2("dodgers", "giants")), + new Tuple2>("new york", + new Tuple2("yankees", "mets"))), + Arrays.asList( + new Tuple2>("california", + new Tuple2("sharks", "ducks")), + new Tuple2>("new york", + new Tuple2("rangers", "islanders")))); + + + JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( + sc, stringStringKVStream1, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( + sc, stringStringKVStream2, 1); + JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream> joined = pairStream1.join(pairStream2); + JavaTestUtils.attachTestOutputStream(joined); + List>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } } -- cgit v1.2.3