aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/JavaAPISuite.java
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-17 21:43:17 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-17 21:43:17 -0800
commitc46dd2de78ae0c13060d0a9d2dea110c655659f0 (patch)
tree3952a40028ef1cd370c2321b1b142888187667d8 /streaming/src/test/java/JavaAPISuite.java
parente0165bf7141086e28f88cd68ab7bc6249061c924 (diff)
downloadspark-c46dd2de78ae0c13060d0a9d2dea110c655659f0.tar.gz
spark-c46dd2de78ae0c13060d0a9d2dea110c655659f0.tar.bz2
spark-c46dd2de78ae0c13060d0a9d2dea110c655659f0.zip
Moving tests to appropriate directory
Diffstat (limited to 'streaming/src/test/java/JavaAPISuite.java')
-rw-r--r--streaming/src/test/java/JavaAPISuite.java1027
1 files changed, 1027 insertions, 0 deletions
diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/JavaAPISuite.java
new file mode 100644
index 0000000000..8c94e13e65
--- /dev/null
+++ b/streaming/src/test/java/JavaAPISuite.java
@@ -0,0 +1,1027 @@
+package spark.streaming;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Tuple2;
+import spark.HashPartitioner;
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.*;
+import spark.storage.StorageLevel;
+import spark.streaming.api.java.JavaDStream;
+import spark.streaming.api.java.JavaPairDStream;
+import spark.streaming.api.java.JavaStreamingContext;
+import spark.streaming.JavaTestUtils;
+import spark.streaming.JavaCheckpointTestUtils;
+import spark.streaming.dstream.KafkaPartitionKey;
+
+import java.io.*;
+import java.util.*;
+
+// The test suite itself is Serializable so that anonymous Function implementations can be
+// serialized, as an alternative to converting these anonymous classes to static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+ private transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port");
+ }
+
+ @Test
+ public void testCount() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3,4),
+ Arrays.asList(3,4,5),
+ Arrays.asList(3));
+
+ List<List<Long>> expected = Arrays.asList(
+ Arrays.asList(4L),
+ Arrays.asList(3L),
+ Arrays.asList(1L));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream count = stream.count();
+ JavaTestUtils.attachTestOutputStream(count);
+ List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("goodnight", "moon"));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(5,5),
+ Arrays.asList(9,4));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(letterCount);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testWindow() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6,1,2,3),
+ Arrays.asList(7,8,9,4,5,6),
+ Arrays.asList(7,8,9));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream windowed = stream.window(new Duration(2000));
+ JavaTestUtils.attachTestOutputStream(windowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testWindowWithSlideDuration() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9),
+ Arrays.asList(10,11,12),
+ Arrays.asList(13,14,15),
+ Arrays.asList(16,17,18));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3,4,5,6),
+ Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12),
+ Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
+ Arrays.asList(13,14,15,16,17,18));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000));
+ JavaTestUtils.attachTestOutputStream(windowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testTumble() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9),
+ Arrays.asList(10,11,12),
+ Arrays.asList(13,14,15),
+ Arrays.asList(16,17,18));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3,4,5,6),
+ Arrays.asList(7,8,9,10,11,12),
+ Arrays.asList(13,14,15,16,17,18));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream windowed = stream.tumble(new Duration(2000));
+ JavaTestUtils.attachTestOutputStream(windowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("yankees"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream filtered = stream.filter(new Function<String, Boolean>() {
+ @Override
+ public Boolean call(String s) throws Exception {
+ return s.contains("a");
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testGlom() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<List<String>>> expected = Arrays.asList(
+ Arrays.asList(Arrays.asList("giants", "dodgers")),
+ Arrays.asList(Arrays.asList("yankees", "red socks")));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream glommed = stream.glom();
+ JavaTestUtils.attachTestOutputStream(glommed);
+ List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMapPartitions() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("GIANTSDODGERS"),
+ Arrays.asList("YANKEESRED SOCKS"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
+ @Override
+ public Iterable<String> call(Iterator<String> in) {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ private class IntegerSum extends Function2<Integer, Integer, Integer> {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ }
+
+ private class IntegerDifference extends Function2<Integer, Integer, Integer> {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 - i2;
+ }
+ }
+
+ @Test
+ public void testReduce() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(15),
+ Arrays.asList(24));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream reduced = stream.reduce(new IntegerSum());
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByWindow() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(21),
+ Arrays.asList(39),
+ Arrays.asList(24));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ new IntegerDifference(), new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reducedWindowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testQueueStream() {
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
+ JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3));
+ JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6));
+ JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9));
+
+ LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
+ rdds.add(rdd1);
+ rdds.add(rdd2);
+ rdds.add(rdd3);
+
+ JavaDStream<Integer> stream = ssc.queueStream(rdds);
+ JavaTestUtils.attachTestOutputStream(stream);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testTransform() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3,4,5),
+ Arrays.asList(6,7,8),
+ Arrays.asList(9,10,11));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ @Override
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ return in.map(new Function<Integer, Integer>() {
+ @Override
+ public Integer call(Integer i) throws Exception {
+ return i + 2;
+ }
+ });
+ }});
+ JavaTestUtils.attachTestOutputStream(transformed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("go", "giants"),
+ Arrays.asList("boo", "dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("g","o","g","i","a","n","t","s"),
+ Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
+ Arrays.asList("a","t","h","l","e","t","i","c","s"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(x.split("(?!^)"));
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testPairFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(6, "g"),
+ new Tuple2<Integer, String>(6, "i"),
+ new Tuple2<Integer, String>(6, "a"),
+ new Tuple2<Integer, String>(6, "n"),
+ new Tuple2<Integer, String>(6, "t"),
+ new Tuple2<Integer, String>(6, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "o"),
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "g"),
+ new Tuple2<Integer, String>(7, "e"),
+ new Tuple2<Integer, String>(7, "r"),
+ new Tuple2<Integer, String>(7, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(9, "a"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "h"),
+ new Tuple2<Integer, String>(9, "l"),
+ new Tuple2<Integer, String>(9, "e"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "i"),
+ new Tuple2<Integer, String>(9, "c"),
+ new Tuple2<Integer, String>(9, "s")));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(in.length(), letter));
+ }
+ return out;
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testUnion() {
+ List<List<Integer>> inputData1 = Arrays.asList(
+ Arrays.asList(1,1),
+ Arrays.asList(2,2),
+ Arrays.asList(3,3));
+
+ List<List<Integer>> inputData2 = Arrays.asList(
+ Arrays.asList(4,4),
+ Arrays.asList(5,5),
+ Arrays.asList(6,6));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,1,4,4),
+ Arrays.asList(2,2,5,5),
+ Arrays.asList(3,3,6,6));
+
+ JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
+ JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
+
+ JavaDStream unioned = stream1.union(stream2);
+ JavaTestUtils.attachTestOutputStream(unioned);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ /*
+ * Performs an order-invariant comparison of lists representing two RDD streams. This allows
+ * us to account for ordering variation within individual RDD's which occurs during windowing.
+ */
+ public static <T extends Comparable> void assertOrderInvariantEquals(
+ List<List<T>> expected, List<List<T>> actual) {
+ for (List<T> list: expected) {
+ Collections.sort(list);
+ }
+ for (List<T> list: actual) {
+ Collections.sort(list);
+ }
+ Assert.assertEquals(expected, actual);
+ }
+
+
+ // PairDStream Functions
+ @Test
+ public void testPairFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
+ Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = stream.map(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2 call(String in) throws Exception {
+ return new Tuple2<String, Integer>(in, in.length());
+ }
+ });
+
+ JavaPairDStream<String, Integer> filtered = pairStream.filter(
+ new Function<Tuple2<String, Integer>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, Integer> in) throws Exception {
+ return in._1().contains("a");
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "yankees"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "rangers"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+ List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 1),
+ new Tuple2<String, Integer>("california", 3),
+ new Tuple2<String, Integer>("new york", 4),
+ new Tuple2<String, Integer>("new york", 1)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("new york", 3),
+ new Tuple2<String, Integer>("new york", 1)));
+
+ @Test
+ public void testPairGroupByKey() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey();
+ JavaTestUtils.attachTestOutputStream(grouped);
+ List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairReduceByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum());
+
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCombineByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
+ new Function<Integer, Integer>() {
+ @Override
+ public Integer call(Integer i) throws Exception {
+ return i;
+ }
+ }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
+
+ JavaTestUtils.attachTestOutputStream(combined);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCountByKey() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, Long>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Long> counted = pairStream.countByKey();
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testGroupByKeyAndWindow() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
+ Arrays.asList(new Tuple2<String, List<String>>("california",
+ Arrays.asList("sharks", "ducks", "dodgers", "giants")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
+ Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, List<String>> groupWindowed =
+ pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(groupWindowed);
+ List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindow() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testUpdateStateByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
+ new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v: values) {
+ out = out + v;
+ }
+ return Optional.of(out);
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(updated);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindowWithInverse() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCountByKeyAndWindow() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, Long>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 4L),
+ new Tuple2<String, Long>("new york", 4L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Long> counted =
+ pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
+ new Tuple2<String, String>("california", "GIANTS"),
+ new Tuple2<String, String>("new york", "YANKEES"),
+ new Tuple2<String, String>("new york", "METS")),
+ Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
+ new Tuple2<String, String>("california", "DUCKS"),
+ new Tuple2<String, String>("new york", "RANGERS"),
+ new Tuple2<String, String>("new york", "ISLANDERS")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
+ @Override
+ public String call(String s) throws Exception {
+ return s.toUpperCase();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
+ new Tuple2<String, String>("california", "dodgers2"),
+ new Tuple2<String, String>("california", "giants1"),
+ new Tuple2<String, String>("california", "giants2"),
+ new Tuple2<String, String>("new york", "yankees1"),
+ new Tuple2<String, String>("new york", "yankees2"),
+ new Tuple2<String, String>("new york", "mets1"),
+ new Tuple2<String, String>("new york", "mets2")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
+ new Tuple2<String, String>("california", "sharks2"),
+ new Tuple2<String, String>("california", "ducks1"),
+ new Tuple2<String, String>("california", "ducks2"),
+ new Tuple2<String, String>("new york", "rangers1"),
+ new Tuple2<String, String>("new york", "rangers2"),
+ new Tuple2<String, String>("new york", "islanders1"),
+ new Tuple2<String, String>("new york", "islanders2")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+
+ JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(
+ new Function<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> call(String in) {
+ List<String> out = new ArrayList<String>();
+ out.add(in + "1");
+ out.add(in + "2");
+ return out;
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCoGroup() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
+
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
+ JavaTestUtils.attachTestOutputStream(grouped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testJoin() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
+
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCheckpointMasterRecovery() throws InterruptedException {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("this", "is"),
+ Arrays.asList("a", "test"),
+ Arrays.asList("counting", "letters"));
+
+ List<List<Integer>> expectedInitial = Arrays.asList(
+ Arrays.asList(4,2));
+ List<List<Integer>> expectedFinal = Arrays.asList(
+ Arrays.asList(1,4),
+ Arrays.asList(8,7));
+
+
+ File tempDir = Files.createTempDir();
+ ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+
+ JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ });
+ JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
+ List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1);
+
+ assertOrderInvariantEquals(expectedInitial, initialResult);
+ Thread.sleep(1000);
+
+ ssc.stop();
+ ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
+ ssc.start();
+ List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2);
+ assertOrderInvariantEquals(expectedFinal, finalResult);
+ }
+
+ /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ @Test
+ public void testCheckpointofIndividualStream() throws InterruptedException {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("this", "is"),
+ Arrays.asList("a", "test"),
+ Arrays.asList("counting", "letters"));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(4,2),
+ Arrays.asList(1,4),
+ Arrays.asList(8,7));
+
+ JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ });
+ JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
+
+ letterCount.checkpoint(new Duration(1000));
+
+ List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3);
+ assertOrderInvariantEquals(expected, result1);
+ }
+ */
+
+ // Input stream tests. These mostly just test that we can instantiate a given InputStream with
+ // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
+ // InputStream functionality is deferred to the existing Scala tests.
+ @Test
+ public void testKafkaStream() {
+ HashMap<String, Integer> topics = Maps.newHashMap();
+ HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
+ JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
+ JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
+ JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ StorageLevel.MEMORY_AND_DISK());
+ }
+
+ @Test
+ public void testNetworkTextStream() {
+ JavaDStream test = ssc.networkTextStream("localhost", 12345);
+ }
+
+ @Test
+ public void testNetworkString() {
+ class Converter extends Function<InputStream, Iterable<String>> {
+ public Iterable<String> call(InputStream in) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ List<String> out = new ArrayList<String>();
+ try {
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) { break; }
+ out.add(line);
+ }
+ } catch (IOException e) { }
+ return out;
+ }
+ }
+
+ JavaDStream test = ssc.networkStream(
+ "localhost",
+ 12345,
+ new Converter(),
+ StorageLevel.MEMORY_ONLY());
+ }
+
+ @Test
+ public void testTextFileStream() {
+ JavaDStream test = ssc.textFileStream("/tmp/foo");
+ }
+
+ @Test
+ public void testRawNetworkStream() {
+ JavaDStream test = ssc.rawNetworkStream("localhost", 12345);
+ }
+
+ @Test
+ public void testFlumeStream() {
+ JavaDStream test = ssc.flumeStream("localhost", 12345);
+ }
+
+ @Test
+ public void testFileStream() {
+ JavaPairDStream<String, String> foo =
+ ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
+ }
+}