diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-17 21:07:09 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-17 21:25:49 -0800 |
commit | e0165bf7141086e28f88cd68ab7bc6249061c924 (patch) | |
tree | eed0fbf2db1b1b93395632e57985a4fd072c76f6 /streaming/src | |
parent | 6fba7683c29b64a33a6daa28cc56bb5d20574314 (diff) | |
download | spark-e0165bf7141086e28f88cd68ab7bc6249061c924.tar.gz spark-e0165bf7141086e28f88cd68ab7bc6249061c924.tar.bz2 spark-e0165bf7141086e28f88cd68ab7bc6249061c924.zip |
Adding queueStream and some slight refactoring
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala | 58 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/JavaAPISuite.java | 186 |
2 files changed, 163 insertions, 81 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index accac82e09..f82e6a37cc 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -10,6 +10,7 @@ import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import java.io.InputStream import java.util.{Map => JMap} +import spark.api.java.{JavaSparkContext, JavaRDD} /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -39,6 +40,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { */ def this(path: String) = this (new StreamingContext(path)) + /** The underlying SparkContext */ + val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) + /** * Create an input stream that pulls messages form a Kafka Broker. * @param hostname Zookeper hostname. @@ -255,6 +259,60 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * + * NOTE: changes to the queue after the stream is created will not be recognized. + * @param queue Queue of RDDs + * @tparam T Type of objects in the RDD + */ + def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]] + sQueue.enqueue(queue.map(_.rdd).toSeq: _*) + ssc.queueStream(sQueue) + } + + /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * + * NOTE: changes to the queue after the stream is created will not be recognized. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @tparam T Type of objects in the RDD + */ + def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]] + sQueue.enqueue(queue.map(_.rdd).toSeq: _*) + ssc.queueStream(sQueue, oneAtATime) + } + + /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * + * NOTE: changes to the queue after the stream is created will not be recognized. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty + * @tparam T Type of objects in the RDD + */ + def queueStream[T]( + queue: java.util.Queue[JavaRDD[T]], + oneAtATime: Boolean, + defaultRDD: JavaRDD[T]): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]] + sQueue.enqueue(queue.map(_.rdd).toSeq: _*) + ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd) + } + + /** * Sets the context to periodically checkpoint the DStream operations for master * fault-tolerance. By default, the graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index 374793b57e..8c94e13e65 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -12,6 +12,7 @@ import org.junit.Test; import scala.Tuple2; import spark.HashPartitioner; import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; import spark.api.java.function.*; import spark.storage.StorageLevel; import spark.streaming.api.java.JavaDStream; @@ -28,17 +29,17 @@ import java.util.*; // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite implements Serializable { - private transient JavaStreamingContext sc; + private transient JavaStreamingContext ssc; @Before public void setUp() { - sc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); } @After public void tearDown() { - sc.stop(); - sc = null; + ssc.stop(); + ssc = null; // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port"); } @@ -55,10 +56,10 @@ public class JavaAPISuite implements Serializable { Arrays.asList(3L), Arrays.asList(1L)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream count = stream.count(); JavaTestUtils.attachTestOutputStream(count); - List<List<Long>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result); } @@ -72,7 +73,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(5,5), Arrays.asList(9,4)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { @@ -80,7 +81,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(letterCount); - List<List<Integer>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); assertOrderInvariantEquals(expected, result); } @@ -98,10 +99,10 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9,4,5,6), Arrays.asList(7,8,9)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream windowed = stream.window(new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); - List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); assertOrderInvariantEquals(expected, result); } @@ -122,10 +123,10 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), Arrays.asList(13,14,15,16,17,18)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); - List<List<Integer>> result = JavaTestUtils.runStreams(sc, 8, 4); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4); assertOrderInvariantEquals(expected, result); } @@ -145,10 +146,10 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9,10,11,12), Arrays.asList(13,14,15,16,17,18)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream windowed = stream.tumble(new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); - List<List<Integer>> result = JavaTestUtils.runStreams(sc, 6, 3); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3); assertOrderInvariantEquals(expected, result); } @@ -163,7 +164,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList("giants"), Arrays.asList("yankees")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream filtered = stream.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { @@ -171,7 +172,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(filtered); - List<List<String>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); assertOrderInvariantEquals(expected, result); } @@ -186,10 +187,10 @@ public class JavaAPISuite implements Serializable { Arrays.asList(Arrays.asList("giants", "dodgers")), Arrays.asList(Arrays.asList("yankees", "red socks"))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream glommed = stream.glom(); JavaTestUtils.attachTestOutputStream(glommed); - List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -204,7 +205,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList("GIANTSDODGERS"), Arrays.asList("YANKEESRED SOCKS")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { @Override public Iterable<String> call(Iterator<String> in) { @@ -216,7 +217,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(mapped); - List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -247,10 +248,10 @@ public class JavaAPISuite implements Serializable { Arrays.asList(15), Arrays.asList(24)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream reduced = stream.reduce(new IntegerSum()); JavaTestUtils.attachTestOutputStream(reduced); - List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @@ -268,16 +269,39 @@ public class JavaAPISuite implements Serializable { Arrays.asList(39), Arrays.asList(24)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reducedWindowed); - List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); Assert.assertEquals(expected, result); } @Test + public void testQueueStream() { + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); + JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); + JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); + JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); + + LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList(); + rdds.add(rdd1); + rdds.add(rdd2); + rdds.add(rdd3); + + JavaDStream<Integer> stream = ssc.queueStream(rdds); + JavaTestUtils.attachTestOutputStream(stream); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + Assert.assertEquals(expected, result); + } + + @Test public void testTransform() { List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1,2,3), @@ -289,7 +313,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(6,7,8), Arrays.asList(9,10,11)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { @Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { @@ -301,7 +325,7 @@ public class JavaAPISuite implements Serializable { }); }}); JavaTestUtils.attachTestOutputStream(transformed); - List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result); } @@ -318,7 +342,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), Arrays.asList("a","t","h","l","e","t","i","c","s")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { @@ -326,7 +350,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<String>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result); } @@ -365,7 +389,7 @@ public class JavaAPISuite implements Serializable { new Tuple2<Integer, String>(9, "c"), new Tuple2<Integer, String>(9, "s"))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { @@ -377,7 +401,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @@ -399,12 +423,12 @@ public class JavaAPISuite implements Serializable { Arrays.asList(2,2,5,5), Arrays.asList(3,3,6,6)); - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(sc, inputData1, 2); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(sc, inputData2, 2); + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); JavaDStream unioned = stream1.union(stream2); JavaTestUtils.attachTestOutputStream(unioned); - List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result); } @@ -436,7 +460,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(new Tuple2<String, Integer>("giants", 6)), Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = stream.map( new PairFunction<String, String, Integer>() { @Override @@ -453,7 +477,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(filtered); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -492,12 +516,12 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey(); JavaTestUtils.attachTestOutputStream(grouped); - List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -515,13 +539,13 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, Integer>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( - sc, inputData, 1); + ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum()); JavaTestUtils.attachTestOutputStream(reduced); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -539,7 +563,7 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, Integer>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( - sc, inputData, 1); + ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( @@ -551,7 +575,7 @@ public class JavaAPISuite implements Serializable { }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); JavaTestUtils.attachTestOutputStream(combined); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -569,12 +593,12 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, Long>("new york", 2L))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - sc, inputData, 1); + ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Long> counted = pairStream.countByKey(); JavaTestUtils.attachTestOutputStream(counted); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -592,13 +616,13 @@ public class JavaAPISuite implements Serializable { Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, List<String>> groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); - List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @@ -615,13 +639,13 @@ public class JavaAPISuite implements Serializable { Arrays.asList(new Tuple2<String, Integer>("california", 10), new Tuple2<String, Integer>("new york", 4))); - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> reduceWindowed = pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @@ -638,7 +662,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(new Tuple2<String, Integer>("california", 14), new Tuple2<String, Integer>("new york", 9))); - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( @@ -656,7 +680,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(updated); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @@ -673,13 +697,13 @@ public class JavaAPISuite implements Serializable { Arrays.asList(new Tuple2<String, Integer>("california", 10), new Tuple2<String, Integer>("new york", 4))); - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> reduceWindowed = pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @@ -700,13 +724,13 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, Long>("new york", 2L))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - sc, inputData, 1); + ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Long> counted = pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @@ -726,7 +750,7 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, String>("new york", "ISLANDERS"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - sc, inputData, 1); + ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() { @@ -737,7 +761,7 @@ public class JavaAPISuite implements Serializable { }); JavaTestUtils.attachTestOutputStream(mapped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -765,7 +789,7 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, String>("new york", "islanders2"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - sc, inputData, 1); + ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -781,7 +805,7 @@ public class JavaAPISuite implements Serializable { }); JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -815,16 +839,16 @@ public class JavaAPISuite implements Serializable { JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( - sc, stringStringKVStream1, 1); + ssc, stringStringKVStream1, 1); JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( - sc, stringStringKVStream2, 1); + ssc, stringStringKVStream2, 1); JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -858,16 +882,16 @@ public class JavaAPISuite implements Serializable { JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( - sc, stringStringKVStream1, 1); + ssc, stringStringKVStream1, 1); JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( - sc, stringStringKVStream2, 1); + ssc, stringStringKVStream2, 1); JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2); JavaTestUtils.attachTestOutputStream(joined); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -887,9 +911,9 @@ public class JavaAPISuite implements Serializable { File tempDir = Files.createTempDir(); - sc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { @@ -897,15 +921,15 @@ public class JavaAPISuite implements Serializable { } }); JavaCheckpointTestUtils.attachTestOutputStream(letterCount); - List<List<Integer>> initialResult = JavaTestUtils.runStreams(sc, 1, 1); + List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); assertOrderInvariantEquals(expectedInitial, initialResult); Thread.sleep(1000); - sc.stop(); - sc = new JavaStreamingContext(tempDir.getAbsolutePath()); - sc.start(); - List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(sc, 2, 2); + ssc.stop(); + ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); + ssc.start(); + List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); assertOrderInvariantEquals(expectedFinal, finalResult); } @@ -922,7 +946,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(1,4), Arrays.asList(8,7)); - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { @@ -933,7 +957,7 @@ public class JavaAPISuite implements Serializable { letterCount.checkpoint(new Duration(1000)); - List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(sc, 3, 3); + List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result1); } */ @@ -945,15 +969,15 @@ public class JavaAPISuite implements Serializable { public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); - JavaDStream test1 = sc.kafkaStream("localhost", 12345, "group", topics); - JavaDStream test2 = sc.kafkaStream("localhost", 12345, "group", topics, offsets); - JavaDStream test3 = sc.kafkaStream("localhost", 12345, "group", topics, offsets, + JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); + JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); + JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, StorageLevel.MEMORY_AND_DISK()); } @Test public void testNetworkTextStream() { - JavaDStream test = sc.networkTextStream("localhost", 12345); + JavaDStream test = ssc.networkTextStream("localhost", 12345); } @Test @@ -973,7 +997,7 @@ public class JavaAPISuite implements Serializable { } } - JavaDStream test = sc.networkStream( + JavaDStream test = ssc.networkStream( "localhost", 12345, new Converter(), @@ -982,22 +1006,22 @@ public class JavaAPISuite implements Serializable { @Test public void testTextFileStream() { - JavaDStream test = sc.textFileStream("/tmp/foo"); + JavaDStream test = ssc.textFileStream("/tmp/foo"); } @Test public void testRawNetworkStream() { - JavaDStream test = sc.rawNetworkStream("localhost", 12345); + JavaDStream test = ssc.rawNetworkStream("localhost", 12345); } @Test public void testFlumeStream() { - JavaDStream test = sc.flumeStream("localhost", 12345); + JavaDStream test = ssc.flumeStream("localhost", 12345); } @Test public void testFileStream() { JavaPairDStream<String, String> foo = - sc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); + ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); } } |