diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-17 21:43:17 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-17 21:43:17 -0800 |
commit | c46dd2de78ae0c13060d0a9d2dea110c655659f0 (patch) | |
tree | 3952a40028ef1cd370c2321b1b142888187667d8 /streaming/src/test/java/JavaAPISuite.java | |
parent | e0165bf7141086e28f88cd68ab7bc6249061c924 (diff) | |
download | spark-c46dd2de78ae0c13060d0a9d2dea110c655659f0.tar.gz spark-c46dd2de78ae0c13060d0a9d2dea110c655659f0.tar.bz2 spark-c46dd2de78ae0c13060d0a9d2dea110c655659f0.zip |
Moving tests to appropriate directory
Diffstat (limited to 'streaming/src/test/java/JavaAPISuite.java')
-rw-r--r-- | streaming/src/test/java/JavaAPISuite.java | 1027 |
1 files changed, 1027 insertions, 0 deletions
diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/JavaAPISuite.java new file mode 100644 index 0000000000..8c94e13e65 --- /dev/null +++ b/streaming/src/test/java/JavaAPISuite.java @@ -0,0 +1,1027 @@ +package spark.streaming; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import scala.Tuple2; +import spark.HashPartitioner; +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.*; +import spark.storage.StorageLevel; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; +import spark.streaming.JavaTestUtils; +import spark.streaming.JavaCheckpointTestUtils; +import spark.streaming.dstream.KafkaPartitionKey; + +import java.io.*; +import java.util.*; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite implements Serializable { + private transient JavaStreamingContext ssc; + + @Before + public void setUp() { + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port"); + } + + @Test + public void testCount() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3,4), + Arrays.asList(3,4,5), + Arrays.asList(3)); + + List<List<Long>> expected = Arrays.asList( + Arrays.asList(4L), + Arrays.asList(3L), + Arrays.asList(1L)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream count = stream.count(); + JavaTestUtils.attachTestOutputStream(count); + List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3); + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("goodnight", "moon")); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(5,5), + Arrays.asList(9,4)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaTestUtils.attachTestOutputStream(letterCount); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindow() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6,1,2,3), + Arrays.asList(7,8,9,4,5,6), + Arrays.asList(7,8,9)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindowWithSlideDuration() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12), + Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testTumble() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(7,8,9,10,11,12), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.tumble(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFilter() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("yankees")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream filtered = stream.filter(new Function<String, Boolean>() { + @Override + public Boolean call(String s) throws Exception { + return s.contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testGlom() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<List<String>>> expected = Arrays.asList( + Arrays.asList(Arrays.asList("giants", "dodgers")), + Arrays.asList(Arrays.asList("yankees", "red socks"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream glommed = stream.glom(); + JavaTestUtils.attachTestOutputStream(glommed); + List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapPartitions() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("GIANTSDODGERS"), + Arrays.asList("YANKEESRED SOCKS")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { + @Override + public Iterable<String> call(Iterator<String> in) { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + } + }); + JavaTestUtils.attachTestOutputStream(mapped); + List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + private class IntegerSum extends Function2<Integer, Integer, Integer> { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + } + + private class IntegerDifference extends Function2<Integer, Integer, Integer> { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 - i2; + } + } + + @Test + public void testReduce() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(15), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream reduced = stream.reduce(new IntegerSum()); + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByWindow() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(21), + Arrays.asList(39), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), + new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reducedWindowed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); + + Assert.assertEquals(expected, result); + } + + @Test + public void testQueueStream() { + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); + JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); + JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); + JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); + + LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList(); + rdds.add(rdd1); + rdds.add(rdd2); + rdds.add(rdd3); + + JavaDStream<Integer> stream = ssc.queueStream(rdds); + JavaTestUtils.attachTestOutputStream(stream); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + Assert.assertEquals(expected, result); + } + + @Test + public void testTransform() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(3,4,5), + Arrays.asList(6,7,8), + Arrays.asList(9,10,11)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return in.map(new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + }}); + JavaTestUtils.attachTestOutputStream(transformed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFlatMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("go", "giants"), + Arrays.asList("boo", "dodgers"), + Arrays.asList("athletics")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("g","o","g","i","a","n","t","s"), + Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), + Arrays.asList("a","t","h","l","e","t","i","c","s")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(x.split("(?!^)")); + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testPairFlatMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("dodgers"), + Arrays.asList("athletics")); + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(6, "g"), + new Tuple2<Integer, String>(6, "i"), + new Tuple2<Integer, String>(6, "a"), + new Tuple2<Integer, String>(6, "n"), + new Tuple2<Integer, String>(6, "t"), + new Tuple2<Integer, String>(6, "s")), + Arrays.asList( + new Tuple2<Integer, String>(7, "d"), + new Tuple2<Integer, String>(7, "o"), + new Tuple2<Integer, String>(7, "d"), + new Tuple2<Integer, String>(7, "g"), + new Tuple2<Integer, String>(7, "e"), + new Tuple2<Integer, String>(7, "r"), + new Tuple2<Integer, String>(7, "s")), + Arrays.asList( + new Tuple2<Integer, String>(9, "a"), + new Tuple2<Integer, String>(9, "t"), + new Tuple2<Integer, String>(9, "h"), + new Tuple2<Integer, String>(9, "l"), + new Tuple2<Integer, String>(9, "e"), + new Tuple2<Integer, String>(9, "t"), + new Tuple2<Integer, String>(9, "i"), + new Tuple2<Integer, String>(9, "c"), + new Tuple2<Integer, String>(9, "s"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { + List<Tuple2<Integer, String>> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2<Integer, String>(in.length(), letter)); + } + return out; + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUnion() { + List<List<Integer>> inputData1 = Arrays.asList( + Arrays.asList(1,1), + Arrays.asList(2,2), + Arrays.asList(3,3)); + + List<List<Integer>> inputData2 = Arrays.asList( + Arrays.asList(4,4), + Arrays.asList(5,5), + Arrays.asList(6,6)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,1,4,4), + Arrays.asList(2,2,5,5), + Arrays.asList(3,3,6,6)); + + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); + + JavaDStream unioned = stream1.union(stream2); + JavaTestUtils.attachTestOutputStream(unioned); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + /* + * Performs an order-invariant comparison of lists representing two RDD streams. This allows + * us to account for ordering variation within individual RDD's which occurs during windowing. + */ + public static <T extends Comparable> void assertOrderInvariantEquals( + List<List<T>> expected, List<List<T>> actual) { + for (List<T> list: expected) { + Collections.sort(list); + } + for (List<T> list: actual) { + Collections.sort(list); + } + Assert.assertEquals(expected, actual); + } + + + // PairDStream Functions + @Test + public void testPairFilter() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("giants", 6)), + Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = stream.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2 call(String in) throws Exception { + return new Tuple2<String, Integer>(in, in.length()); + } + }); + + JavaPairDStream<String, Integer> filtered = pairStream.filter( + new Function<Tuple2<String, Integer>, Boolean>() { + @Override + public Boolean call(Tuple2<String, Integer> in) throws Exception { + return in._1().contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "yankees"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "rangers"), + new Tuple2<String, String>("new york", "islanders"))); + + List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 1), + new Tuple2<String, Integer>("california", 3), + new Tuple2<String, Integer>("new york", 4), + new Tuple2<String, Integer>("new york", 1)), + Arrays.asList( + new Tuple2<String, Integer>("california", 5), + new Tuple2<String, Integer>("california", 5), + new Tuple2<String, Integer>("new york", 3), + new Tuple2<String, Integer>("new york", 1))); + + @Test + public void testPairGroupByKey() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey(); + JavaTestUtils.attachTestOutputStream(grouped); + List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairReduceByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList( + new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum()); + + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCombineByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList( + new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( + new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i; + } + }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); + + JavaTestUtils.attachTestOutputStream(combined); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKey() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, Long>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L)), + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Long> counted = pairStream.countByKey(); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testGroupByKeyAndWindow() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList(new Tuple2<String, List<String>>("california", + Arrays.asList("sharks", "ducks", "dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), + Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, List<String>> groupWindowed = + pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(groupWindowed); + List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindow() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9)), + Arrays.asList(new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUpdateStateByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( + new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){ + @Override + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; + } + return Optional.of(out); + } + }); + JavaTestUtils.attachTestOutputStream(updated); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindowWithInverse() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9)), + Arrays.asList(new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKeyAndWindow() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, Long>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L)), + Arrays.asList( + new Tuple2<String, Long>("california", 4L), + new Tuple2<String, Long>("new york", 4L)), + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Long> counted = + pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapValues() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, String>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "DODGERS"), + new Tuple2<String, String>("california", "GIANTS"), + new Tuple2<String, String>("new york", "YANKEES"), + new Tuple2<String, String>("new york", "METS")), + Arrays.asList(new Tuple2<String, String>("california", "SHARKS"), + new Tuple2<String, String>("california", "DUCKS"), + new Tuple2<String, String>("new york", "RANGERS"), + new Tuple2<String, String>("new york", "ISLANDERS"))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() { + @Override + public String call(String s) throws Exception { + return s.toUpperCase(); + } + }); + + JavaTestUtils.attachTestOutputStream(mapped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testFlatMapValues() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, String>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers1"), + new Tuple2<String, String>("california", "dodgers2"), + new Tuple2<String, String>("california", "giants1"), + new Tuple2<String, String>("california", "giants2"), + new Tuple2<String, String>("new york", "yankees1"), + new Tuple2<String, String>("new york", "yankees2"), + new Tuple2<String, String>("new york", "mets1"), + new Tuple2<String, String>("new york", "mets2")), + Arrays.asList(new Tuple2<String, String>("california", "sharks1"), + new Tuple2<String, String>("california", "sharks2"), + new Tuple2<String, String>("california", "ducks1"), + new Tuple2<String, String>("california", "ducks2"), + new Tuple2<String, String>("new york", "rangers1"), + new Tuple2<String, String>("new york", "rangers2"), + new Tuple2<String, String>("new york", "islanders1"), + new Tuple2<String, String>("new york", "islanders2"))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + + JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues( + new Function<String, Iterable<String>>() { + @Override + public Iterable<String> call(String in) { + List<String> out = new ArrayList<String>(); + out.add(in + "1"); + out.add(in + "2"); + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCoGroup() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<List<String>, List<String>>>("california", + new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))), + new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", + new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))), + Arrays.asList( + new Tuple2<String, Tuple2<List<String>, List<String>>>("california", + new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))), + new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", + new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); + + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2); + JavaTestUtils.attachTestOutputStream(grouped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testJoin() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("dodgers", "giants")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("sharks", "ducks")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("rangers", "islanders")))); + + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2); + JavaTestUtils.attachTestOutputStream(joined); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCheckpointMasterRecovery() throws InterruptedException { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List<List<Integer>> expectedInitial = Arrays.asList( + Arrays.asList(4,2)); + List<List<Integer>> expectedFinal = Arrays.asList( + Arrays.asList(1,4), + Arrays.asList(8,7)); + + + File tempDir = Files.createTempDir(); + ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expectedInitial, initialResult); + Thread.sleep(1000); + + ssc.stop(); + ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); + ssc.start(); + List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); + assertOrderInvariantEquals(expectedFinal, finalResult); + } + + /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @Test + public void testCheckpointofIndividualStream() throws InterruptedException { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(4,2), + Arrays.asList(1,4), + Arrays.asList(8,7)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + + letterCount.checkpoint(new Duration(1000)); + + List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3); + assertOrderInvariantEquals(expected, result1); + } + */ + + // Input stream tests. These mostly just test that we can instantiate a given InputStream with + // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the + // InputStream functionality is deferred to the existing Scala tests. + @Test + public void testKafkaStream() { + HashMap<String, Integer> topics = Maps.newHashMap(); + HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); + JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); + JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); + JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, + StorageLevel.MEMORY_AND_DISK()); + } + + @Test + public void testNetworkTextStream() { + JavaDStream test = ssc.networkTextStream("localhost", 12345); + } + + @Test + public void testNetworkString() { + class Converter extends Function<InputStream, Iterable<String>> { + public Iterable<String> call(InputStream in) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + List<String> out = new ArrayList<String>(); + try { + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } + } catch (IOException e) { } + return out; + } + } + + JavaDStream test = ssc.networkStream( + "localhost", + 12345, + new Converter(), + StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testTextFileStream() { + JavaDStream test = ssc.textFileStream("/tmp/foo"); + } + + @Test + public void testRawNetworkStream() { + JavaDStream test = ssc.rawNetworkStream("localhost", 12345); + } + + @Test + public void testFlumeStream() { + JavaDStream test = ssc.flumeStream("localhost", 12345); + } + + @Test + public void testFileStream() { + JavaPairDStream<String, String> foo = + ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); + } +} |