aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-09 19:27:32 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit560c312c6060914c9c38cb98d3587685f10f7311 (patch)
treefa037986aba760ff9576702d97627211450280ed /streaming
parent7e1049d8f1b155a4bd742e84927c4cc83bb71cb6 (diff)
downloadspark-560c312c6060914c9c38cb98d3587685f10f7311.tar.gz
spark-560c312c6060914c9c38cb98d3587685f10f7311.tar.bz2
spark-560c312c6060914c9c38cb98d3587685f10f7311.zip
Docs, some tests, and work on
StreamingContext
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala33
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala59
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala46
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala152
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java359
5 files changed, 617 insertions, 32 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 9bf595e0bc..1e5c279e2c 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -4,15 +4,25 @@ import spark.streaming.{Time, DStream}
import spark.api.java.function.{Function => JFunction}
import spark.api.java.JavaRDD
import java.util.{List => JList}
+import spark.storage.StorageLevel
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
extends JavaDStreamLike[T, JavaDStream[T]] {
+ /** Returns a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
dstream.filter((x => f(x).booleanValue()))
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaDStream[T] = dstream.cache()
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): JavaDStream[T] = dstream.cache()
+
+ /** Persists the RDDs of this DStream with the given storage level */
+ def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
+
+ /** Method that generates a RDD for the given time */
def compute(validTime: Time): JavaRDD[T] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaRDD(rdd)
@@ -20,15 +30,38 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
}
}
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @return
+ */
def window(windowTime: Time): JavaDStream[T] =
dstream.window(windowTime)
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * @param windowTime duration (i.e., width) of the window;
+ * must be a multiple of this DStream's interval
+ * @param slideTime sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's interval
+ */
def window(windowTime: Time, slideTime: Time): JavaDStream[T] =
dstream.window(windowTime, slideTime)
+ /**
+ * Returns a new DStream which computed based on tumbling window on this DStream.
+ * This is equivalent to window(batchTime, batchTime).
+ * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ */
def tumble(batchTime: Time): JavaDStream[T] =
dstream.tumble(batchTime)
+ /**
+ * Returns a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ */
def union(that: JavaDStream[T]): JavaDStream[T] =
dstream.union(that.dstream)
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 05d89918b2..23a0aaaefd 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -16,41 +16,81 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def dstream: DStream[T]
+ /**
+ * Prints the first ten elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
def print() = dstream.print()
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting each RDD
+ * of this DStream.
+ */
def count(): JavaDStream[Int] = dstream.count()
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting the number
+ * of elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).count()
+ */
def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = {
dstream.countByWindow(windowTime, slideTime)
}
+ /**
+ * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+ * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+ * an array.
+ */
def glom(): JavaDStream[JList[T]] =
new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+ /** Returns the StreamingContext associated with this DStream */
def context(): StreamingContext = dstream.context()
+ /** Returns a new DStream by applying a function to all elements of this DStream. */
def map[R](f: JFunction[T, R]): JavaDStream[R] = {
new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
}
+ /** Returns a new DStream by applying a function to all elements of this DStream. */
def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
}
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V])
: JavaPairDStream[K, V] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
}
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing each RDD
+ * of this DStream.
+ */
def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
+ */
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
invReduceFunc: JFunction2[T, T, T],
@@ -59,18 +99,33 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
}
+ /**
+ * Returns all the RDDs between 'fromTime' to 'toTime' (both included)
+ */
def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = {
new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq)
}
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
}
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
}
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
@@ -79,6 +134,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
dstream.transform(scalaTransform(_))
}
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index f6dfbb2345..f36b870046 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -12,18 +12,31 @@ import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
import spark.api.java.{JavaPairRDD, JavaRDD}
+import spark.storage.StorageLevel
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
- // Common to all DStream's
+ // =======================================================================
+ // Methods common to all DStream's
+ // =======================================================================
+
+ /** Returns a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
dstream.filter((x => f(x).booleanValue()))
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaPairDStream[K, V] = dstream.cache()
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): JavaPairDStream[K, V] = dstream.cache()
+
+ /** Persists the RDDs of this DStream with the given storage level */
+ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
+
+ /** Method that generates a RDD for the given time */
def compute(validTime: Time): JavaPairRDD[K, V] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaPairRDD(rdd)
@@ -31,19 +44,45 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
}
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @return
+ */
def window(windowTime: Time): JavaPairDStream[K, V] =
dstream.window(windowTime)
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * @param windowTime duration (i.e., width) of the window;
+ * must be a multiple of this DStream's interval
+ * @param slideTime sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's interval
+ */
def window(windowTime: Time, slideTime: Time): JavaPairDStream[K, V] =
dstream.window(windowTime, slideTime)
+ /**
+ * Returns a new DStream which computed based on tumbling window on this DStream.
+ * This is equivalent to window(batchTime, batchTime).
+ * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ */
def tumble(batchTime: Time): JavaPairDStream[K, V] =
dstream.tumble(batchTime)
+ /**
+ * Returns a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ */
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
- // Only for PairDStreams...
+ // =======================================================================
+ // Methods only for PairDStream's
+ // =======================================================================
+
def groupByKey(): JavaPairDStream[K, JList[V]] =
dstream.groupByKey().mapValues(seqAsJavaList _)
@@ -59,8 +98,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] =
dstream.reduceByKey(func, numPartitions)
- // TODO: TEST BELOW
- def combineByKey[C](createCombiner: Function[V, C],
+ def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairDStream[K, C] = {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 19cd032fc1..f96b4fbd7d 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -4,27 +4,173 @@ import scala.collection.JavaConversions._
import java.util.{List => JList}
import spark.streaming._
-import dstream.SparkFlumeEvent
+import dstream._
import spark.storage.StorageLevel
+import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import java.io.InputStream
class JavaStreamingContext(val ssc: StreamingContext) {
def this(master: String, frameworkName: String, batchDuration: Time) =
this(new StreamingContext(master, frameworkName, batchDuration))
- def textFileStream(directory: String): JavaDStream[String] = {
- ssc.textFileStream(directory)
+ // TODOs:
+ // - Test StreamingContext functions
+ // - Test to/from Hadoop functions
+ // - Add checkpoint()/remember()
+ // - Support creating your own streams
+ // - Add Kafka Stream
+
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
+ def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
+ : JavaDStream[String] = {
+ ssc.networkTextStream(hostname, port, storageLevel)
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ */
def networkTextStream(hostname: String, port: Int): JavaDStream[String] = {
ssc.networkTextStream(hostname, port)
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes it interepreted as object using the given
+ * converter.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param converter Function to convert the byte stream to objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects received (after converting bytes to objects)
+ */
+ def networkStream[T](
+ hostname: String,
+ port: Int,
+ converter: JFunction[InputStream, java.lang.Iterable[T]],
+ storageLevel: StorageLevel)
+ : JavaDStream[T] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: InputStream) => converter.apply(x).toIterator
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.networkStream(hostname, port, fn, storageLevel)
+ }
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as text files (using key as LongWritable, value
+ * as Text and input format as TextInputFormat). File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ */
+ def textFileStream(directory: String): JavaDStream[String] = {
+ ssc.textFileStream(directory)
+ }
+
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects in the received blocks
+ */
+ def rawNetworkStream[T](
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel): JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel))
+ }
+
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @tparam T Type of the objects in the received blocks
+ */
+ def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port))
+ }
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmf: ClassManifest[F] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
+ ssc.fileStream[K, V, F](directory);
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port, storageLevel)
}
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def flumeStream(hostname: String, port: Int):
+ JavaDStream[SparkFlumeEvent] = {
+ ssc.flumeStream(hostname, port)
+ }
+
+ // NOT SUPPORTED: registerInputStream
+
+ /**
+ * Registers an output stream that will be computed every interval
+ */
+ def registerOutputStream(outputStream: JavaDStreamLike[_, _]) {
+ ssc.registerOutputStream(outputStream.dstream)
+ }
+
+ /**
+ * Starts the execution of the streams.
+ */
def start() = ssc.start()
+
+ /**
+ * Sstops the execution of the streams.
+ */
def stop() = ssc.stop()
}
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index c1373e6275..fa3a5801dd 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -6,6 +6,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
+import spark.HashPartitioner;
import spark.api.java.JavaRDD;
import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function;
@@ -377,18 +378,31 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
+ List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "yankees"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "rangers"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+ List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 1),
+ new Tuple2<String, Integer>("california", 3),
+ new Tuple2<String, Integer>("new york", 4),
+ new Tuple2<String, Integer>("new york", 1)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("new york", 3),
+ new Tuple2<String, Integer>("new york", 1)));
+
@Test
public void testPairGroupByKey() {
- List<List<Tuple2<String, String>>> inputData = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "yankees"),
- new Tuple2<String, String>("new york", "mets")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "rangers"),
- new Tuple2<String, String>("new york", "islanders")));
-
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
Arrays.asList(
@@ -410,18 +424,31 @@ public class JavaAPISuite implements Serializable {
@Test
public void testPairReduceByKey() {
- List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 1),
- new Tuple2<String, Integer>("california", 3),
- new Tuple2<String, Integer>("new york", 4),
- new Tuple2<String, Integer>("new york", 1)),
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("new york", 3),
- new Tuple2<String, Integer>("new york", 1)));
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum());
+
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCombineByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
@@ -435,17 +462,299 @@ public class JavaAPISuite implements Serializable {
sc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
+ JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
+ new Function<Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
- return i1 + i2;
+ public Integer call(Integer i) throws Exception {
+ return i;
}
- });
+ }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
- JavaTestUtils.attachTestOutputStream(reduced);
+ JavaTestUtils.attachTestOutputStream(combined);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
Assert.assertEquals(expected, result);
}
+
+ @Test
+ public void testCountByKey() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, Long>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ // TODO: Below fails with compile error with <String, Long>... wtf?
+ JavaPairDStream<String, Object> counted = pairStream.countByKey();
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testGroupByKeyAndWindow() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
+ Arrays.asList(new Tuple2<String, List<String>>("california",
+ Arrays.asList("sharks", "ducks", "dodgers", "giants")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
+ Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, List<String>> groupWindowed =
+ pairStream.groupByKeyAndWindow(new Time(2000), new Time(1000));
+ JavaTestUtils.attachTestOutputStream(groupWindowed);
+ List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindow() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new Time(2000), new Time(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindowWithInverse() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Time(2000), new Time(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCountByKeyAndWindow() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, Long>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 4L),
+ new Tuple2<String, Long>("new york", 4L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ // TODO: Below fails with compile error with <String, Long>... wtf?
+ JavaPairDStream<String, Object> counted = pairStream.countByKeyAndWindow(new Time(2000), new Time(1000));
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
+ new Tuple2<String, String>("california", "GIANTS"),
+ new Tuple2<String, String>("new york", "YANKEES"),
+ new Tuple2<String, String>("new york", "METS")),
+ Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
+ new Tuple2<String, String>("california", "DUCKS"),
+ new Tuple2<String, String>("new york", "RANGERS"),
+ new Tuple2<String, String>("new york", "ISLANDERS")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
+ @Override
+ public String call(String s) throws Exception {
+ return s.toUpperCase();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
+ new Tuple2<String, String>("california", "dodgers2"),
+ new Tuple2<String, String>("california", "giants1"),
+ new Tuple2<String, String>("california", "giants2"),
+ new Tuple2<String, String>("new york", "yankees1"),
+ new Tuple2<String, String>("new york", "yankees2"),
+ new Tuple2<String, String>("new york", "mets1"),
+ new Tuple2<String, String>("new york", "mets2")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
+ new Tuple2<String, String>("california", "sharks2"),
+ new Tuple2<String, String>("california", "ducks1"),
+ new Tuple2<String, String>("california", "ducks2"),
+ new Tuple2<String, String>("new york", "rangers1"),
+ new Tuple2<String, String>("new york", "rangers2"),
+ new Tuple2<String, String>("new york", "islanders1"),
+ new Tuple2<String, String>("new york", "islanders2")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+
+ JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(
+ new Function<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> call(String in) {
+ List<String> out = new ArrayList<String>();
+ out.add(in + "1");
+ out.add(in + "2");
+ return out;
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCoGroup() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
+
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ sc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ sc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
+ JavaTestUtils.attachTestOutputStream(grouped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testJoin() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
+
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ sc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ sc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
}