From c46dd2de78ae0c13060d0a9d2dea110c655659f0 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 17 Jan 2013 21:43:17 -0800 Subject: Moving tests to appropriate directory --- streaming/src/test/java/JavaAPISuite.java | 1027 ++++++++++++++++++++ streaming/src/test/java/JavaTestUtils.scala | 65 ++ streaming/src/test/scala/JavaTestUtils.scala | 65 -- .../test/scala/spark/streaming/JavaAPISuite.java | 1027 -------------------- 4 files changed, 1092 insertions(+), 1092 deletions(-) create mode 100644 streaming/src/test/java/JavaAPISuite.java create mode 100644 streaming/src/test/java/JavaTestUtils.scala delete mode 100644 streaming/src/test/scala/JavaTestUtils.scala delete mode 100644 streaming/src/test/scala/spark/streaming/JavaAPISuite.java (limited to 'streaming/src') diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/JavaAPISuite.java new file mode 100644 index 0000000000..8c94e13e65 --- /dev/null +++ b/streaming/src/test/java/JavaAPISuite.java @@ -0,0 +1,1027 @@ +package spark.streaming; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import scala.Tuple2; +import spark.HashPartitioner; +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.*; +import spark.storage.StorageLevel; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; +import spark.streaming.JavaTestUtils; +import spark.streaming.JavaCheckpointTestUtils; +import spark.streaming.dstream.KafkaPartitionKey; + +import java.io.*; +import java.util.*; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite implements Serializable { + private transient JavaStreamingContext ssc; + + @Before + public void setUp() { + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port"); + } + + @Test + public void testCount() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3,4), + Arrays.asList(3,4,5), + Arrays.asList(3)); + + List> expected = Arrays.asList( + Arrays.asList(4L), + Arrays.asList(3L), + Arrays.asList(1L)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream count = stream.count(); + JavaTestUtils.attachTestOutputStream(count); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testMap() { + List> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("goodnight", "moon")); + + List> expected = Arrays.asList( + Arrays.asList(5,5), + Arrays.asList(9,4)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaTestUtils.attachTestOutputStream(letterCount); + List> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindow() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6,1,2,3), + Arrays.asList(7,8,9,4,5,6), + Arrays.asList(7,8,9)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List> result = JavaTestUtils.runStreams(ssc, 4, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindowWithSlideDuration() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12), + Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List> result = JavaTestUtils.runStreams(ssc, 8, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testTumble() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(7,8,9,10,11,12), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.tumble(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List> result = JavaTestUtils.runStreams(ssc, 6, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFilter() { + List> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List> expected = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("yankees")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream filtered = stream.filter(new Function() { + @Override + public Boolean call(String s) throws Exception { + return s.contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testGlom() { + List> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List>> expected = Arrays.asList( + Arrays.asList(Arrays.asList("giants", "dodgers")), + Arrays.asList(Arrays.asList("yankees", "red socks"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream glommed = stream.glom(); + JavaTestUtils.attachTestOutputStream(glommed); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapPartitions() { + List> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List> expected = Arrays.asList( + Arrays.asList("GIANTSDODGERS"), + Arrays.asList("YANKEESRED SOCKS")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream mapped = stream.mapPartitions(new FlatMapFunction, String>() { + @Override + public Iterable call(Iterator in) { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + } + }); + JavaTestUtils.attachTestOutputStream(mapped); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + private class IntegerSum extends Function2 { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + } + + private class IntegerDifference extends Function2 { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 - i2; + } + } + + @Test + public void testReduce() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(15), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream reduced = stream.reduce(new IntegerSum()); + JavaTestUtils.attachTestOutputStream(reduced); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByWindow() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(21), + Arrays.asList(39), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), + new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reducedWindowed); + List> result = JavaTestUtils.runStreams(ssc, 4, 4); + + Assert.assertEquals(expected, result); + } + + @Test + public void testQueueStream() { + List> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); + JavaRDD rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); + JavaRDD rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); + JavaRDD rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); + + LinkedList> rdds = Lists.newLinkedList(); + rdds.add(rdd1); + rdds.add(rdd2); + rdds.add(rdd3); + + JavaDStream stream = ssc.queueStream(rdds); + JavaTestUtils.attachTestOutputStream(stream); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + Assert.assertEquals(expected, result); + } + + @Test + public void testTransform() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List> expected = Arrays.asList( + Arrays.asList(3,4,5), + Arrays.asList(6,7,8), + Arrays.asList(9,10,11)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream transformed = stream.transform(new Function, JavaRDD>() { + @Override + public JavaRDD call(JavaRDD in) throws Exception { + return in.map(new Function() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + }}); + JavaTestUtils.attachTestOutputStream(transformed); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFlatMap() { + List> inputData = Arrays.asList( + Arrays.asList("go", "giants"), + Arrays.asList("boo", "dodgers"), + Arrays.asList("athletics")); + + List> expected = Arrays.asList( + Arrays.asList("g","o","g","i","a","n","t","s"), + Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), + Arrays.asList("a","t","h","l","e","t","i","c","s")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream flatMapped = stream.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(x.split("(?!^)")); + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testPairFlatMap() { + List> inputData = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("dodgers"), + Arrays.asList("athletics")); + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2(6, "g"), + new Tuple2(6, "i"), + new Tuple2(6, "a"), + new Tuple2(6, "n"), + new Tuple2(6, "t"), + new Tuple2(6, "s")), + Arrays.asList( + new Tuple2(7, "d"), + new Tuple2(7, "o"), + new Tuple2(7, "d"), + new Tuple2(7, "g"), + new Tuple2(7, "e"), + new Tuple2(7, "r"), + new Tuple2(7, "s")), + Arrays.asList( + new Tuple2(9, "a"), + new Tuple2(9, "t"), + new Tuple2(9, "h"), + new Tuple2(9, "l"), + new Tuple2(9, "e"), + new Tuple2(9, "t"), + new Tuple2(9, "i"), + new Tuple2(9, "c"), + new Tuple2(9, "s"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction() { + @Override + public Iterable> call(String in) throws Exception { + List> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2(in.length(), letter)); + } + return out; + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUnion() { + List> inputData1 = Arrays.asList( + Arrays.asList(1,1), + Arrays.asList(2,2), + Arrays.asList(3,3)); + + List> inputData2 = Arrays.asList( + Arrays.asList(4,4), + Arrays.asList(5,5), + Arrays.asList(6,6)); + + List> expected = Arrays.asList( + Arrays.asList(1,1,4,4), + Arrays.asList(2,2,5,5), + Arrays.asList(3,3,6,6)); + + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); + + JavaDStream unioned = stream1.union(stream2); + JavaTestUtils.attachTestOutputStream(unioned); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + /* + * Performs an order-invariant comparison of lists representing two RDD streams. This allows + * us to account for ordering variation within individual RDD's which occurs during windowing. + */ + public static void assertOrderInvariantEquals( + List> expected, List> actual) { + for (List list: expected) { + Collections.sort(list); + } + for (List list: actual) { + Collections.sort(list); + } + Assert.assertEquals(expected, actual); + } + + + // PairDStream Functions + @Test + public void testPairFilter() { + List> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("giants", 6)), + Arrays.asList(new Tuple2("yankees", 7))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = stream.map( + new PairFunction() { + @Override + public Tuple2 call(String in) throws Exception { + return new Tuple2(in, in.length()); + } + }); + + JavaPairDStream filtered = pairStream.filter( + new Function, Boolean>() { + @Override + public Boolean call(Tuple2 in) throws Exception { + return in._1().contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + List>> stringStringKVStream = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("california", "giants"), + new Tuple2("new york", "yankees"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("california", "ducks"), + new Tuple2("new york", "rangers"), + new Tuple2("new york", "islanders"))); + + List>> stringIntKVStream = Arrays.asList( + Arrays.asList( + new Tuple2("california", 1), + new Tuple2("california", 3), + new Tuple2("new york", 4), + new Tuple2("new york", 1)), + Arrays.asList( + new Tuple2("california", 5), + new Tuple2("california", 5), + new Tuple2("new york", 3), + new Tuple2("new york", 1))); + + @Test + public void testPairGroupByKey() { + List>> inputData = stringStringKVStream; + + List>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2>("california", Arrays.asList("dodgers", "giants")), + new Tuple2>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList( + new Tuple2>("california", Arrays.asList("sharks", "ducks")), + new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream> grouped = pairStream.groupByKey(); + JavaTestUtils.attachTestOutputStream(grouped); + List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairReduceByKey() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList( + new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream reduced = pairStream.reduceByKey(new IntegerSum()); + + JavaTestUtils.attachTestOutputStream(reduced); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCombineByKey() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList( + new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream combined = pairStream.combineByKey( + new Function() { + @Override + public Integer call(Integer i) throws Exception { + return i; + } + }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); + + JavaTestUtils.attachTestOutputStream(combined); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKey() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L)), + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream counted = pairStream.countByKey(); + JavaTestUtils.attachTestOutputStream(counted); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testGroupByKeyAndWindow() { + List>> inputData = stringStringKVStream; + + List>>> expected = Arrays.asList( + Arrays.asList(new Tuple2>("california", Arrays.asList("dodgers", "giants")), + new Tuple2>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList(new Tuple2>("california", + Arrays.asList("sharks", "ducks", "dodgers", "giants")), + new Tuple2>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), + Arrays.asList(new Tuple2>("california", Arrays.asList("sharks", "ducks")), + new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream> groupWindowed = + pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(groupWindowed); + List>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindow() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9)), + Arrays.asList(new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUpdateStateByKey() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream updated = pairStream.updateStateByKey( + new Function2, Optional, Optional>(){ + @Override + public Optional call(List values, Optional state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; + } + return Optional.of(out); + } + }); + JavaTestUtils.attachTestOutputStream(updated); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindowWithInverse() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9)), + Arrays.asList(new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKeyAndWindow() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L)), + Arrays.asList( + new Tuple2("california", 4L), + new Tuple2("new york", 4L)), + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream counted = + pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(counted); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapValues() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", "DODGERS"), + new Tuple2("california", "GIANTS"), + new Tuple2("new york", "YANKEES"), + new Tuple2("new york", "METS")), + Arrays.asList(new Tuple2("california", "SHARKS"), + new Tuple2("california", "DUCKS"), + new Tuple2("new york", "RANGERS"), + new Tuple2("new york", "ISLANDERS"))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream mapped = pairStream.mapValues(new Function() { + @Override + public String call(String s) throws Exception { + return s.toUpperCase(); + } + }); + + JavaTestUtils.attachTestOutputStream(mapped); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testFlatMapValues() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers1"), + new Tuple2("california", "dodgers2"), + new Tuple2("california", "giants1"), + new Tuple2("california", "giants2"), + new Tuple2("new york", "yankees1"), + new Tuple2("new york", "yankees2"), + new Tuple2("new york", "mets1"), + new Tuple2("new york", "mets2")), + Arrays.asList(new Tuple2("california", "sharks1"), + new Tuple2("california", "sharks2"), + new Tuple2("california", "ducks1"), + new Tuple2("california", "ducks2"), + new Tuple2("new york", "rangers1"), + new Tuple2("new york", "rangers2"), + new Tuple2("new york", "islanders1"), + new Tuple2("new york", "islanders2"))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + + JavaPairDStream flatMapped = pairStream.flatMapValues( + new Function>() { + @Override + public Iterable call(String in) { + List out = new ArrayList(); + out.add(in + "1"); + out.add(in + "2"); + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(flatMapped); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCoGroup() { + List>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("new york", "rangers"))); + + List>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2("california", "giants"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "ducks"), + new Tuple2("new york", "islanders"))); + + + List, List>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2, List>>("california", + new Tuple2, List>(Arrays.asList("dodgers"), Arrays.asList("giants"))), + new Tuple2, List>>("new york", + new Tuple2, List>(Arrays.asList("yankees"), Arrays.asList("mets")))), + Arrays.asList( + new Tuple2, List>>("california", + new Tuple2, List>(Arrays.asList("sharks"), Arrays.asList("ducks"))), + new Tuple2, List>>("new york", + new Tuple2, List>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); + + + JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream, List>> grouped = pairStream1.cogroup(pairStream2); + JavaTestUtils.attachTestOutputStream(grouped); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testJoin() { + List>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("new york", "rangers"))); + + List>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2("california", "giants"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "ducks"), + new Tuple2("new york", "islanders"))); + + + List>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2>("california", + new Tuple2("dodgers", "giants")), + new Tuple2>("new york", + new Tuple2("yankees", "mets"))), + Arrays.asList( + new Tuple2>("california", + new Tuple2("sharks", "ducks")), + new Tuple2>("new york", + new Tuple2("rangers", "islanders")))); + + + JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream> joined = pairStream1.join(pairStream2); + JavaTestUtils.attachTestOutputStream(joined); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCheckpointMasterRecovery() throws InterruptedException { + List> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List> expectedInitial = Arrays.asList( + Arrays.asList(4,2)); + List> expectedFinal = Arrays.asList( + Arrays.asList(1,4), + Arrays.asList(8,7)); + + + File tempDir = Files.createTempDir(); + ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + List> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expectedInitial, initialResult); + Thread.sleep(1000); + + ssc.stop(); + ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); + ssc.start(); + List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); + assertOrderInvariantEquals(expectedFinal, finalResult); + } + + /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @Test + public void testCheckpointofIndividualStream() throws InterruptedException { + List> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List> expected = Arrays.asList( + Arrays.asList(4,2), + Arrays.asList(1,4), + Arrays.asList(8,7)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + + letterCount.checkpoint(new Duration(1000)); + + List> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3); + assertOrderInvariantEquals(expected, result1); + } + */ + + // Input stream tests. These mostly just test that we can instantiate a given InputStream with + // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the + // InputStream functionality is deferred to the existing Scala tests. + @Test + public void testKafkaStream() { + HashMap topics = Maps.newHashMap(); + HashMap offsets = Maps.newHashMap(); + JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); + JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); + JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, + StorageLevel.MEMORY_AND_DISK()); + } + + @Test + public void testNetworkTextStream() { + JavaDStream test = ssc.networkTextStream("localhost", 12345); + } + + @Test + public void testNetworkString() { + class Converter extends Function> { + public Iterable call(InputStream in) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + List out = new ArrayList(); + try { + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } + } catch (IOException e) { } + return out; + } + } + + JavaDStream test = ssc.networkStream( + "localhost", + 12345, + new Converter(), + StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testTextFileStream() { + JavaDStream test = ssc.textFileStream("/tmp/foo"); + } + + @Test + public void testRawNetworkStream() { + JavaDStream test = ssc.rawNetworkStream("localhost", 12345); + } + + @Test + public void testFlumeStream() { + JavaDStream test = ssc.flumeStream("localhost", 12345); + } + + @Test + public void testFileStream() { + JavaPairDStream foo = + ssc.fileStream("/tmp/foo"); + } +} diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/JavaTestUtils.scala new file mode 100644 index 0000000000..56349837e5 --- /dev/null +++ b/streaming/src/test/java/JavaTestUtils.scala @@ -0,0 +1,65 @@ +package spark.streaming + +import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import java.util.{List => JList} +import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} +import spark.streaming._ +import java.util.ArrayList +import collection.JavaConversions._ + +/** Exposes streaming test functionality in a Java-friendly way. */ +trait JavaTestBase extends TestSuiteBase { + + /** + * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. + * The stream will be derived from the supplied lists of Java objects. + **/ + def attachTestInputStream[T]( + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { + val seqData = data.map(Seq(_:_*)) + + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) + ssc.ssc.registerInputStream(dstream) + new JavaDStream[T](dstream) + } + + /** + * Attach a provided stream to it's associated StreamingContext as a + * [[spark.streaming.TestOutputStream]]. + **/ + def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( + dstream: JavaDStreamLike[T, This]) = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val ostream = new TestOutputStream(dstream.dstream, + new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + dstream.dstream.ssc.registerOutputStream(ostream) + } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + */ + def runStreams[V]( + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + implicit val cm: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[V]]() + res.map(entry => out.append(new ArrayList[V](entry))) + out + } +} + +object JavaTestUtils extends JavaTestBase { + +} + +object JavaCheckpointTestUtils extends JavaTestBase { + override def actuallyWait = true +} \ No newline at end of file diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala deleted file mode 100644 index 56349837e5..0000000000 --- a/streaming/src/test/scala/JavaTestUtils.scala +++ /dev/null @@ -1,65 +0,0 @@ -package spark.streaming - -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import java.util.{List => JList} -import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} -import spark.streaming._ -import java.util.ArrayList -import collection.JavaConversions._ - -/** Exposes streaming test functionality in a Java-friendly way. */ -trait JavaTestBase extends TestSuiteBase { - - /** - * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. - * The stream will be derived from the supplied lists of Java objects. - **/ - def attachTestInputStream[T]( - ssc: JavaStreamingContext, - data: JList[JList[T]], - numPartitions: Int) = { - val seqData = data.map(Seq(_:_*)) - - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) - ssc.ssc.registerInputStream(dstream) - new JavaDStream[T](dstream) - } - - /** - * Attach a provided stream to it's associated StreamingContext as a - * [[spark.streaming.TestOutputStream]]. - **/ - def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( - dstream: JavaDStreamLike[T, This]) = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - val ostream = new TestOutputStream(dstream.dstream, - new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) - dstream.dstream.ssc.registerOutputStream(ostream) - } - - /** - * Process all registered streams for a numBatches batches, failing if - * numExpectedOutput RDD's are not generated. Generated RDD's are collected - * and returned, represented as a list for each batch interval. - */ - def runStreams[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { - implicit val cm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] - val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) - val out = new ArrayList[JList[V]]() - res.map(entry => out.append(new ArrayList[V](entry))) - out - } -} - -object JavaTestUtils extends JavaTestBase { - -} - -object JavaCheckpointTestUtils extends JavaTestBase { - override def actuallyWait = true -} \ No newline at end of file diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java deleted file mode 100644 index 8c94e13e65..0000000000 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ /dev/null @@ -1,1027 +0,0 @@ -package spark.streaming; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import scala.Tuple2; -import spark.HashPartitioner; -import spark.api.java.JavaRDD; -import spark.api.java.JavaSparkContext; -import spark.api.java.function.*; -import spark.storage.StorageLevel; -import spark.streaming.api.java.JavaDStream; -import spark.streaming.api.java.JavaPairDStream; -import spark.streaming.api.java.JavaStreamingContext; -import spark.streaming.JavaTestUtils; -import spark.streaming.JavaCheckpointTestUtils; -import spark.streaming.dstream.KafkaPartitionKey; - -import java.io.*; -import java.util.*; - -// The test suite itself is Serializable so that anonymous Function implementations can be -// serialized, as an alternative to converting these anonymous classes to static inner classes; -// see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaStreamingContext ssc; - - @Before - public void setUp() { - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port"); - } - - @Test - public void testCount() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3,4), - Arrays.asList(3,4,5), - Arrays.asList(3)); - - List> expected = Arrays.asList( - Arrays.asList(4L), - Arrays.asList(3L), - Arrays.asList(1L)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream count = stream.count(); - JavaTestUtils.attachTestOutputStream(count); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testMap() { - List> inputData = Arrays.asList( - Arrays.asList("hello", "world"), - Arrays.asList("goodnight", "moon")); - - List> expected = Arrays.asList( - Arrays.asList(5,5), - Arrays.asList(9,4)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) throws Exception { - return s.length(); - } - }); - JavaTestUtils.attachTestOutputStream(letterCount); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testWindow() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6,1,2,3), - Arrays.asList(7,8,9,4,5,6), - Arrays.asList(7,8,9)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List> result = JavaTestUtils.runStreams(ssc, 4, 4); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testWindowWithSlideDuration() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9), - Arrays.asList(10,11,12), - Arrays.asList(13,14,15), - Arrays.asList(16,17,18)); - - List> expected = Arrays.asList( - Arrays.asList(1,2,3,4,5,6), - Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12), - Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), - Arrays.asList(13,14,15,16,17,18)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List> result = JavaTestUtils.runStreams(ssc, 8, 4); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testTumble() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9), - Arrays.asList(10,11,12), - Arrays.asList(13,14,15), - Arrays.asList(16,17,18)); - - List> expected = Arrays.asList( - Arrays.asList(1,2,3,4,5,6), - Arrays.asList(7,8,9,10,11,12), - Arrays.asList(13,14,15,16,17,18)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.tumble(new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List> result = JavaTestUtils.runStreams(ssc, 6, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testFilter() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); - - List> expected = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("yankees")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream filtered = stream.filter(new Function() { - @Override - public Boolean call(String s) throws Exception { - return s.contains("a"); - } - }); - JavaTestUtils.attachTestOutputStream(filtered); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testGlom() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); - - List>> expected = Arrays.asList( - Arrays.asList(Arrays.asList("giants", "dodgers")), - Arrays.asList(Arrays.asList("yankees", "red socks"))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream glommed = stream.glom(); - JavaTestUtils.attachTestOutputStream(glommed); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testMapPartitions() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); - - List> expected = Arrays.asList( - Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOCKS")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions(new FlatMapFunction, String>() { - @Override - public Iterable call(Iterator in) { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out); - } - }); - JavaTestUtils.attachTestOutputStream(mapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - private class IntegerSum extends Function2 { - @Override - public Integer call(Integer i1, Integer i2) throws Exception { - return i1 + i2; - } - } - - private class IntegerDifference extends Function2 { - @Override - public Integer call(Integer i1, Integer i2) throws Exception { - return i1 - i2; - } - } - - @Test - public void testReduce() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(15), - Arrays.asList(24)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reduced = stream.reduce(new IntegerSum()); - JavaTestUtils.attachTestOutputStream(reduced); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByWindow() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(21), - Arrays.asList(39), - Arrays.asList(24)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), - new IntegerDifference(), new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reducedWindowed); - List> result = JavaTestUtils.runStreams(ssc, 4, 4); - - Assert.assertEquals(expected, result); - } - - @Test - public void testQueueStream() { - List> expected = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); - JavaRDD rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); - JavaRDD rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); - JavaRDD rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); - - LinkedList> rdds = Lists.newLinkedList(); - rdds.add(rdd1); - rdds.add(rdd2); - rdds.add(rdd3); - - JavaDStream stream = ssc.queueStream(rdds); - JavaTestUtils.attachTestOutputStream(stream); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - Assert.assertEquals(expected, result); - } - - @Test - public void testTransform() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(3,4,5), - Arrays.asList(6,7,8), - Arrays.asList(9,10,11)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream transformed = stream.transform(new Function, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD in) throws Exception { - return in.map(new Function() { - @Override - public Integer call(Integer i) throws Exception { - return i + 2; - } - }); - }}); - JavaTestUtils.attachTestOutputStream(transformed); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testFlatMap() { - List> inputData = Arrays.asList( - Arrays.asList("go", "giants"), - Arrays.asList("boo", "dodgers"), - Arrays.asList("athletics")); - - List> expected = Arrays.asList( - Arrays.asList("g","o","g","i","a","n","t","s"), - Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), - Arrays.asList("a","t","h","l","e","t","i","c","s")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream flatMapped = stream.flatMap(new FlatMapFunction() { - @Override - public Iterable call(String x) { - return Lists.newArrayList(x.split("(?!^)")); - } - }); - JavaTestUtils.attachTestOutputStream(flatMapped); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testPairFlatMap() { - List> inputData = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("dodgers"), - Arrays.asList("athletics")); - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2(6, "g"), - new Tuple2(6, "i"), - new Tuple2(6, "a"), - new Tuple2(6, "n"), - new Tuple2(6, "t"), - new Tuple2(6, "s")), - Arrays.asList( - new Tuple2(7, "d"), - new Tuple2(7, "o"), - new Tuple2(7, "d"), - new Tuple2(7, "g"), - new Tuple2(7, "e"), - new Tuple2(7, "r"), - new Tuple2(7, "s")), - Arrays.asList( - new Tuple2(9, "a"), - new Tuple2(9, "t"), - new Tuple2(9, "h"), - new Tuple2(9, "l"), - new Tuple2(9, "e"), - new Tuple2(9, "t"), - new Tuple2(9, "i"), - new Tuple2(9, "c"), - new Tuple2(9, "s"))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction() { - @Override - public Iterable> call(String in) throws Exception { - List> out = Lists.newArrayList(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2(in.length(), letter)); - } - return out; - } - }); - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testUnion() { - List> inputData1 = Arrays.asList( - Arrays.asList(1,1), - Arrays.asList(2,2), - Arrays.asList(3,3)); - - List> inputData2 = Arrays.asList( - Arrays.asList(4,4), - Arrays.asList(5,5), - Arrays.asList(6,6)); - - List> expected = Arrays.asList( - Arrays.asList(1,1,4,4), - Arrays.asList(2,2,5,5), - Arrays.asList(3,3,6,6)); - - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); - - JavaDStream unioned = stream1.union(stream2); - JavaTestUtils.attachTestOutputStream(unioned); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - /* - * Performs an order-invariant comparison of lists representing two RDD streams. This allows - * us to account for ordering variation within individual RDD's which occurs during windowing. - */ - public static void assertOrderInvariantEquals( - List> expected, List> actual) { - for (List list: expected) { - Collections.sort(list); - } - for (List list: actual) { - Collections.sort(list); - } - Assert.assertEquals(expected, actual); - } - - - // PairDStream Functions - @Test - public void testPairFilter() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("giants", 6)), - Arrays.asList(new Tuple2("yankees", 7))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = stream.map( - new PairFunction() { - @Override - public Tuple2 call(String in) throws Exception { - return new Tuple2(in, in.length()); - } - }); - - JavaPairDStream filtered = pairStream.filter( - new Function, Boolean>() { - @Override - public Boolean call(Tuple2 in) throws Exception { - return in._1().contains("a"); - } - }); - JavaTestUtils.attachTestOutputStream(filtered); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - List>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers"), - new Tuple2("california", "giants"), - new Tuple2("new york", "yankees"), - new Tuple2("new york", "mets")), - Arrays.asList(new Tuple2("california", "sharks"), - new Tuple2("california", "ducks"), - new Tuple2("new york", "rangers"), - new Tuple2("new york", "islanders"))); - - List>> stringIntKVStream = Arrays.asList( - Arrays.asList( - new Tuple2("california", 1), - new Tuple2("california", 3), - new Tuple2("new york", 4), - new Tuple2("new york", 1)), - Arrays.asList( - new Tuple2("california", 5), - new Tuple2("california", 5), - new Tuple2("new york", 3), - new Tuple2("new york", 1))); - - @Test - public void testPairGroupByKey() { - List>> inputData = stringStringKVStream; - - List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2>("california", Arrays.asList("dodgers", "giants")), - new Tuple2>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList( - new Tuple2>("california", Arrays.asList("sharks", "ducks")), - new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream> grouped = pairStream.groupByKey(); - JavaTestUtils.attachTestOutputStream(grouped); - List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairReduceByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList( - new Tuple2("california", 10), - new Tuple2("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduced = pairStream.reduceByKey(new IntegerSum()); - - JavaTestUtils.attachTestOutputStream(reduced); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCombineByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList( - new Tuple2("california", 10), - new Tuple2("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream combined = pairStream.combineByKey( - new Function() { - @Override - public Integer call(Integer i) throws Exception { - return i; - } - }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); - - JavaTestUtils.attachTestOutputStream(combined); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCountByKey() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("california", 2L), - new Tuple2("new york", 2L)), - Arrays.asList( - new Tuple2("california", 2L), - new Tuple2("new york", 2L))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream counted = pairStream.countByKey(); - JavaTestUtils.attachTestOutputStream(counted); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testGroupByKeyAndWindow() { - List>> inputData = stringStringKVStream; - - List>>> expected = Arrays.asList( - Arrays.asList(new Tuple2>("california", Arrays.asList("dodgers", "giants")), - new Tuple2>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList(new Tuple2>("california", - Arrays.asList("sharks", "ducks", "dodgers", "giants")), - new Tuple2>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), - Arrays.asList(new Tuple2>("california", Arrays.asList("sharks", "ducks")), - new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream> groupWindowed = - pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(groupWindowed); - List>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindow() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList(new Tuple2("california", 14), - new Tuple2("new york", 9)), - Arrays.asList(new Tuple2("california", 10), - new Tuple2("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testUpdateStateByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList(new Tuple2("california", 14), - new Tuple2("new york", 9)), - Arrays.asList(new Tuple2("california", 14), - new Tuple2("new york", 9))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream updated = pairStream.updateStateByKey( - new Function2, Optional, Optional>(){ - @Override - public Optional call(List values, Optional state) { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v: values) { - out = out + v; - } - return Optional.of(out); - } - }); - JavaTestUtils.attachTestOutputStream(updated); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindowWithInverse() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList(new Tuple2("california", 14), - new Tuple2("new york", 9)), - Arrays.asList(new Tuple2("california", 10), - new Tuple2("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCountByKeyAndWindow() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("california", 2L), - new Tuple2("new york", 2L)), - Arrays.asList( - new Tuple2("california", 4L), - new Tuple2("new york", 4L)), - Arrays.asList( - new Tuple2("california", 2L), - new Tuple2("new york", 2L))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream counted = - pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(counted); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testMapValues() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", "DODGERS"), - new Tuple2("california", "GIANTS"), - new Tuple2("new york", "YANKEES"), - new Tuple2("new york", "METS")), - Arrays.asList(new Tuple2("california", "SHARKS"), - new Tuple2("california", "DUCKS"), - new Tuple2("new york", "RANGERS"), - new Tuple2("new york", "ISLANDERS"))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream mapped = pairStream.mapValues(new Function() { - @Override - public String call(String s) throws Exception { - return s.toUpperCase(); - } - }); - - JavaTestUtils.attachTestOutputStream(mapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMapValues() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers1"), - new Tuple2("california", "dodgers2"), - new Tuple2("california", "giants1"), - new Tuple2("california", "giants2"), - new Tuple2("new york", "yankees1"), - new Tuple2("new york", "yankees2"), - new Tuple2("new york", "mets1"), - new Tuple2("new york", "mets2")), - Arrays.asList(new Tuple2("california", "sharks1"), - new Tuple2("california", "sharks2"), - new Tuple2("california", "ducks1"), - new Tuple2("california", "ducks2"), - new Tuple2("new york", "rangers1"), - new Tuple2("new york", "rangers2"), - new Tuple2("new york", "islanders1"), - new Tuple2("new york", "islanders2"))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - - JavaPairDStream flatMapped = pairStream.flatMapValues( - new Function>() { - @Override - public Iterable call(String in) { - List out = new ArrayList(); - out.add(in + "1"); - out.add(in + "2"); - return out; - } - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCoGroup() { - List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers"), - new Tuple2("new york", "yankees")), - Arrays.asList(new Tuple2("california", "sharks"), - new Tuple2("new york", "rangers"))); - - List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2("california", "giants"), - new Tuple2("new york", "mets")), - Arrays.asList(new Tuple2("california", "ducks"), - new Tuple2("new york", "islanders"))); - - - List, List>>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2, List>>("california", - new Tuple2, List>(Arrays.asList("dodgers"), Arrays.asList("giants"))), - new Tuple2, List>>("new york", - new Tuple2, List>(Arrays.asList("yankees"), Arrays.asList("mets")))), - Arrays.asList( - new Tuple2, List>>("california", - new Tuple2, List>(Arrays.asList("sharks"), Arrays.asList("ducks"))), - new Tuple2, List>>("new york", - new Tuple2, List>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); - - - JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream, List>> grouped = pairStream1.cogroup(pairStream2); - JavaTestUtils.attachTestOutputStream(grouped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testJoin() { - List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers"), - new Tuple2("new york", "yankees")), - Arrays.asList(new Tuple2("california", "sharks"), - new Tuple2("new york", "rangers"))); - - List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2("california", "giants"), - new Tuple2("new york", "mets")), - Arrays.asList(new Tuple2("california", "ducks"), - new Tuple2("new york", "islanders"))); - - - List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2>("california", - new Tuple2("dodgers", "giants")), - new Tuple2>("new york", - new Tuple2("yankees", "mets"))), - Arrays.asList( - new Tuple2>("california", - new Tuple2("sharks", "ducks")), - new Tuple2>("new york", - new Tuple2("rangers", "islanders")))); - - - JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream> joined = pairStream1.join(pairStream2); - JavaTestUtils.attachTestOutputStream(joined); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCheckpointMasterRecovery() throws InterruptedException { - List> inputData = Arrays.asList( - Arrays.asList("this", "is"), - Arrays.asList("a", "test"), - Arrays.asList("counting", "letters")); - - List> expectedInitial = Arrays.asList( - Arrays.asList(4,2)); - List> expectedFinal = Arrays.asList( - Arrays.asList(1,4), - Arrays.asList(8,7)); - - - File tempDir = Files.createTempDir(); - ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); - - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) throws Exception { - return s.length(); - } - }); - JavaCheckpointTestUtils.attachTestOutputStream(letterCount); - List> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); - - assertOrderInvariantEquals(expectedInitial, initialResult); - Thread.sleep(1000); - - ssc.stop(); - ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); - ssc.start(); - List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); - assertOrderInvariantEquals(expectedFinal, finalResult); - } - - /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD - @Test - public void testCheckpointofIndividualStream() throws InterruptedException { - List> inputData = Arrays.asList( - Arrays.asList("this", "is"), - Arrays.asList("a", "test"), - Arrays.asList("counting", "letters")); - - List> expected = Arrays.asList( - Arrays.asList(4,2), - Arrays.asList(1,4), - Arrays.asList(8,7)); - - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) throws Exception { - return s.length(); - } - }); - JavaCheckpointTestUtils.attachTestOutputStream(letterCount); - - letterCount.checkpoint(new Duration(1000)); - - List> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3); - assertOrderInvariantEquals(expected, result1); - } - */ - - // Input stream tests. These mostly just test that we can instantiate a given InputStream with - // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the - // InputStream functionality is deferred to the existing Scala tests. - @Test - public void testKafkaStream() { - HashMap topics = Maps.newHashMap(); - HashMap offsets = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, - StorageLevel.MEMORY_AND_DISK()); - } - - @Test - public void testNetworkTextStream() { - JavaDStream test = ssc.networkTextStream("localhost", 12345); - } - - @Test - public void testNetworkString() { - class Converter extends Function> { - public Iterable call(InputStream in) { - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - List out = new ArrayList(); - try { - while (true) { - String line = reader.readLine(); - if (line == null) { break; } - out.add(line); - } - } catch (IOException e) { } - return out; - } - } - - JavaDStream test = ssc.networkStream( - "localhost", - 12345, - new Converter(), - StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testTextFileStream() { - JavaDStream test = ssc.textFileStream("/tmp/foo"); - } - - @Test - public void testRawNetworkStream() { - JavaDStream test = ssc.rawNetworkStream("localhost", 12345); - } - - @Test - public void testFlumeStream() { - JavaDStream test = ssc.flumeStream("localhost", 12345); - } - - @Test - public void testFileStream() { - JavaPairDStream foo = - ssc.fileStream("/tmp/foo"); - } -} -- cgit v1.2.3