diff options
Diffstat (limited to 'extras')
-rw-r--r-- | extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java | 96 | ||||
-rw-r--r-- | extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java | 381 |
2 files changed, 233 insertions, 244 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index c366c10b15..729bc0459c 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -99,16 +99,16 @@ public class Java8APISuite implements Serializable { @Test public void leftOuterJoin() { JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(1, 2), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(3, 1) + new Tuple2<>(1, 1), + new Tuple2<>(1, 2), + new Tuple2<>(2, 1), + new Tuple2<>(3, 1) )); JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( - new Tuple2<Integer, Character>(1, 'x'), - new Tuple2<Integer, Character>(2, 'y'), - new Tuple2<Integer, Character>(2, 'z'), - new Tuple2<Integer, Character>(4, 'w') + new Tuple2<>(1, 'x'), + new Tuple2<>(2, 'y'), + new Tuple2<>(2, 'z'), + new Tuple2<>(4, 'w') )); List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined = rdd1.leftOuterJoin(rdd2).collect(); @@ -133,11 +133,11 @@ public class Java8APISuite implements Serializable { @Test public void foldByKey() { List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(3, 2), - new Tuple2<Integer, Integer>(3, 1) + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) ); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b); @@ -149,11 +149,11 @@ public class Java8APISuite implements Serializable { @Test public void reduceByKey() { List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(3, 2), - new Tuple2<Integer, Integer>(3, 1) + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) ); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b); @@ -177,7 +177,7 @@ public class Java8APISuite implements Serializable { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache(); doubles.collect(); - JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x)) + JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) .cache(); pairs.collect(); JavaRDD<String> strings = rdd.map(x -> x.toString()).cache(); @@ -194,31 +194,31 @@ public class Java8APISuite implements Serializable { Assert.assertEquals(11, words.count()); JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> { - List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>(); - for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word)); + List<Tuple2<String, String>> pairs2 = new LinkedList<>(); + for (String word : s.split(" ")) pairs2.add(new Tuple2<>(word, word)); return pairs2; }); - Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first()); + Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first()); Assert.assertEquals(11, pairs.count()); JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { - List<Double> lengths = new LinkedList<Double>(); + List<Double> lengths = new LinkedList<>(); for (String word : s.split(" ")) lengths.add(word.length() * 1.0); return lengths; }); Double x = doubles.first(); - Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01); + Assert.assertEquals(5.0, doubles.first(), 0.01); Assert.assertEquals(11, pairs.count()); } @Test public void mapsFromPairsToPairs() { List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); @@ -251,19 +251,18 @@ public class Java8APISuite implements Serializable { tempDir.deleteOnExit(); String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(pair -> - new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()))) + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); // Try reading the output back as an object file JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class) - .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString())); + .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); Assert.assertEquals(pairs, readRDD.collect()); Utils.deleteRecursively(tempDir); } @@ -325,7 +324,7 @@ public class Java8APISuite implements Serializable { } }; - final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam); + final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); rdd.foreach(x -> floatAccum.add((float) x)); Assert.assertEquals((Float) 25.0f, floatAccum.value()); @@ -338,22 +337,22 @@ public class Java8APISuite implements Serializable { public void keyBy() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect(); - Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0)); - Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1)); + Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); + Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); } @Test public void mapOnPairRDD() { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2)); + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); JavaPairRDD<Integer, Integer> rdd3 = - rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1())); + rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); Assert.assertEquals(Arrays.asList( new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(0, 2), - new Tuple2<Integer, Integer>(1, 3), - new Tuple2<Integer, Integer>(0, 4)), rdd3.collect()); + new Tuple2<>(0, 2), + new Tuple2<>(1, 3), + new Tuple2<>(0, 4)), rdd3.collect()); } @Test @@ -361,7 +360,7 @@ public class Java8APISuite implements Serializable { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2)); + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); List[] parts = rdd1.collectPartitions(new int[]{0}); Assert.assertEquals(Arrays.asList(1, 2), parts[0]); @@ -369,16 +368,13 @@ public class Java8APISuite implements Serializable { Assert.assertEquals(Arrays.asList(3, 4), parts[0]); Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(2, 0)), + Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), rdd2.collectPartitions(new int[]{0})[0]); parts = rdd2.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1), - new Tuple2<Integer, Integer>(4, 0)), parts[0]); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1), - new Tuple2<Integer, Integer>(6, 0), - new Tuple2<Integer, Integer>(7, 1)), parts[1]); + Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts[0]); + Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), + parts[1]); } @Test @@ -386,7 +382,7 @@ public class Java8APISuite implements Serializable { // Regression test for SPARK-1040 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1})); JavaPairRDD<Integer, int[]> pairRDD = - rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x})); + rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); pairRDD.collect(); // Works fine Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException } diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 43df0dea61..73091cfe2c 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -39,6 +39,7 @@ import org.apache.spark.streaming.api.java.JavaPairDStream; * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 * lambda syntax. */ +@SuppressWarnings("unchecked") public class Java8APISuite extends LocalJavaStreamingContext implements Serializable { @Test @@ -52,7 +53,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Arrays.asList(9, 4)); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> letterCount = stream.map(s -> s.length()); + JavaDStream<Integer> letterCount = stream.map(String::length); JavaTestUtils.attachTestOutputStream(letterCount); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -63,7 +64,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<String>> expected = Arrays.asList( Arrays.asList("giants"), @@ -81,11 +82,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testMapPartitions() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<String>> expected = Arrays.asList( Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOCKS")); + Arrays.asList("YANKEESRED SOX")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<String> mapped = stream.mapPartitions(in -> { @@ -172,7 +173,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); List<List<Tuple2<String, Integer>>> pairInputData = - Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); @@ -192,32 +193,32 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testTransformWith() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( Arrays.asList( - new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), + new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), Arrays.asList( - new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("new york", "rangers"))); + new Tuple2<>("california", "sharks"), + new Tuple2<>("new york", "rangers"))); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( Arrays.asList( - new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "mets")), + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "mets")), Arrays.asList( - new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "islanders"))); + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "islanders"))); - List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( Sets.newHashSet( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("dodgers", "giants")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("yankees", "mets"))), + new Tuple2<>("california", + new Tuple2<>("dodgers", "giants")), + new Tuple2<>("new york", + new Tuple2<>("yankees", "mets"))), Sets.newHashSet( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("sharks", "ducks")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("rangers", "islanders")))); + new Tuple2<>("california", + new Tuple2<>("sharks", "ducks")), + new Tuple2<>("new york", + new Tuple2<>("rangers", "islanders")))); JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( ssc, stringStringKVStream1, 1); @@ -232,7 +233,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaTestUtils.attachTestOutputStream(joined); List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); + List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); for (List<Tuple2<String, Tuple2<String, String>>> res : result) { unorderedResult.add(Sets.newHashSet(res)); } @@ -251,9 +252,9 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); List<List<Tuple2<String, Integer>>> pairInputData1 = - Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); List<List<Tuple2<Double, Character>>> pairInputData2 = - Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x'))); + Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( @@ -293,13 +294,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ ); List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( - Arrays.asList(new Tuple2<Integer, String>(1, "x")), - Arrays.asList(new Tuple2<Integer, String>(2, "y")) + Arrays.asList(new Tuple2<>(1, "x")), + Arrays.asList(new Tuple2<>(2, "y")) ); List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))), - Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y"))) + Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), + Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) ); JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); @@ -312,7 +313,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ // This is just to test whether this transform to JavaStream compiles JavaDStream<Long> transformed1 = ssc.transform( listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - assert (listOfRDDs.size() == 2); + Assert.assertEquals(2, listOfRDDs.size()); return null; }); @@ -321,13 +322,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - assert (listOfRDDs.size() == 3); + Assert.assertEquals(3, listOfRDDs.size()); JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0); JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1); JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2); JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); PairFunction<Integer, Integer, Integer> mapToTuple = - (Integer i) -> new Tuple2<Integer, Integer>(i, i); + (Integer i) -> new Tuple2<>(i, i); return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); }); JavaTestUtils.attachTestOutputStream(transformed2); @@ -365,36 +366,36 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(6, "g"), - new Tuple2<Integer, String>(6, "i"), - new Tuple2<Integer, String>(6, "a"), - new Tuple2<Integer, String>(6, "n"), - new Tuple2<Integer, String>(6, "t"), - new Tuple2<Integer, String>(6, "s")), + new Tuple2<>(6, "g"), + new Tuple2<>(6, "i"), + new Tuple2<>(6, "a"), + new Tuple2<>(6, "n"), + new Tuple2<>(6, "t"), + new Tuple2<>(6, "s")), Arrays.asList( - new Tuple2<Integer, String>(7, "d"), - new Tuple2<Integer, String>(7, "o"), - new Tuple2<Integer, String>(7, "d"), - new Tuple2<Integer, String>(7, "g"), - new Tuple2<Integer, String>(7, "e"), - new Tuple2<Integer, String>(7, "r"), - new Tuple2<Integer, String>(7, "s")), + new Tuple2<>(7, "d"), + new Tuple2<>(7, "o"), + new Tuple2<>(7, "d"), + new Tuple2<>(7, "g"), + new Tuple2<>(7, "e"), + new Tuple2<>(7, "r"), + new Tuple2<>(7, "s")), Arrays.asList( - new Tuple2<Integer, String>(9, "a"), - new Tuple2<Integer, String>(9, "t"), - new Tuple2<Integer, String>(9, "h"), - new Tuple2<Integer, String>(9, "l"), - new Tuple2<Integer, String>(9, "e"), - new Tuple2<Integer, String>(9, "t"), - new Tuple2<Integer, String>(9, "i"), - new Tuple2<Integer, String>(9, "c"), - new Tuple2<Integer, String>(9, "s"))); + new Tuple2<>(9, "a"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "h"), + new Tuple2<>(9, "l"), + new Tuple2<>(9, "e"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "i"), + new Tuple2<>(9, "c"), + new Tuple2<>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> { List<Tuple2<Integer, String>> out = Lists.newArrayList(); for (String letter : s.split("(?!^)")) { - out.add(new Tuple2<Integer, String>(s.length(), letter)); + out.add(new Tuple2<>(s.length(), letter)); } return out; }); @@ -411,12 +412,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ */ public static <T extends Comparable<T>> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { - for (List<T> list : expected) { - Collections.sort(list); - } - for (List<T> list : actual) { - Collections.sort(list); - } + expected.forEach((List<T> list) -> Collections.sort(list)); + actual.forEach((List<T> list) -> Collections.sort(list)); Assert.assertEquals(expected, actual); } @@ -424,11 +421,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testPairFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("giants", 6)), - Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); + Arrays.asList(new Tuple2<>("giants", 6)), + Arrays.asList(new Tuple2<>("yankees", 7))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = @@ -441,26 +438,26 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ } List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "yankees"), - new Tuple2<String, String>("new york", "mets")), - Arrays.asList(new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "rangers"), - new Tuple2<String, String>("new york", "islanders"))); + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "yankees"), + new Tuple2<>("new york", "mets")), + Arrays.asList(new Tuple2<>("california", "sharks"), + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "rangers"), + new Tuple2<>("new york", "islanders"))); List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 1), - new Tuple2<String, Integer>("california", 3), - new Tuple2<String, Integer>("new york", 4), - new Tuple2<String, Integer>("new york", 1)), + new Tuple2<>("california", 1), + new Tuple2<>("california", 3), + new Tuple2<>("new york", 4), + new Tuple2<>("new york", 1)), Arrays.asList( - new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("new york", 3), - new Tuple2<String, Integer>("new york", 1))); + new Tuple2<>("california", 5), + new Tuple2<>("california", 5), + new Tuple2<>("new york", 3), + new Tuple2<>("new york", 1))); @Test public void testPairMap() { // Maps pair -> pair of different type @@ -468,15 +465,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "california"), - new Tuple2<Integer, String>(3, "california"), - new Tuple2<Integer, String>(4, "new york"), - new Tuple2<Integer, String>(1, "new york")), + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), Arrays.asList( - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(3, "new york"), - new Tuple2<Integer, String>(1, "new york"))); + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -494,21 +491,21 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "california"), - new Tuple2<Integer, String>(3, "california"), - new Tuple2<Integer, String>(4, "new york"), - new Tuple2<Integer, String>(1, "new york")), + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), Arrays.asList( - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(3, "new york"), - new Tuple2<Integer, String>(1, "new york"))); + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> { - LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + LinkedList<Tuple2<Integer, String>> out = new LinkedList<>(); while (in.hasNext()) { Tuple2<String, Integer> next = in.next(); out.add(next.swap()); @@ -530,7 +527,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Arrays.asList(1, 3, 4, 1), Arrays.asList(5, 5, 3, 1)); - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); JavaTestUtils.attachTestOutputStream(reversed); @@ -543,31 +541,31 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("hi", 1), - new Tuple2<String, Integer>("ho", 2)), + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2)), Arrays.asList( - new Tuple2<String, Integer>("hi", 1), - new Tuple2<String, Integer>("ho", 2))); + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2))); List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "h"), - new Tuple2<Integer, String>(1, "i"), - new Tuple2<Integer, String>(2, "h"), - new Tuple2<Integer, String>(2, "o")), + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o")), Arrays.asList( - new Tuple2<Integer, String>(1, "h"), - new Tuple2<Integer, String>(1, "i"), - new Tuple2<Integer, String>(2, "h"), - new Tuple2<Integer, String>(2, "o"))); + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> { - List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + List<Tuple2<Integer, String>> out = new LinkedList<>(); for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<Integer, String>(in._2(), s.toString())); + out.add(new Tuple2<>(in._2(), s.toString())); } return out; }); @@ -584,11 +582,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), Arrays.asList( - new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -608,11 +606,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), Arrays.asList( - new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -632,12 +630,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -656,12 +654,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -689,12 +687,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -713,27 +711,27 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testPairTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(2, 5)), + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(1, 5))); + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5)), + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5))); + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5))); JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -751,15 +749,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testPairToNormalRDDTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(2, 5)), + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(1, 5))); + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); List<List<Integer>> expected = Arrays.asList( Arrays.asList(3, 1, 4, 2), @@ -780,20 +778,20 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, String>>> inputData = stringStringKVStream; List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "DODGERS"), - new Tuple2<String, String>("california", "GIANTS"), - new Tuple2<String, String>("new york", "YANKEES"), - new Tuple2<String, String>("new york", "METS")), - Arrays.asList(new Tuple2<String, String>("california", "SHARKS"), - new Tuple2<String, String>("california", "DUCKS"), - new Tuple2<String, String>("new york", "RANGERS"), - new Tuple2<String, String>("new york", "ISLANDERS"))); + Arrays.asList(new Tuple2<>("california", "DODGERS"), + new Tuple2<>("california", "GIANTS"), + new Tuple2<>("new york", "YANKEES"), + new Tuple2<>("new york", "METS")), + Arrays.asList(new Tuple2<>("california", "SHARKS"), + new Tuple2<>("california", "DUCKS"), + new Tuple2<>("new york", "RANGERS"), + new Tuple2<>("new york", "ISLANDERS"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase()); + JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase); JavaTestUtils.attachTestOutputStream(mapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -805,34 +803,29 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, String>>> inputData = stringStringKVStream; List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers1"), - new Tuple2<String, String>("california", "dodgers2"), - new Tuple2<String, String>("california", "giants1"), - new Tuple2<String, String>("california", "giants2"), - new Tuple2<String, String>("new york", "yankees1"), - new Tuple2<String, String>("new york", "yankees2"), - new Tuple2<String, String>("new york", "mets1"), - new Tuple2<String, String>("new york", "mets2")), - Arrays.asList(new Tuple2<String, String>("california", "sharks1"), - new Tuple2<String, String>("california", "sharks2"), - new Tuple2<String, String>("california", "ducks1"), - new Tuple2<String, String>("california", "ducks2"), - new Tuple2<String, String>("new york", "rangers1"), - new Tuple2<String, String>("new york", "rangers2"), - new Tuple2<String, String>("new york", "islanders1"), - new Tuple2<String, String>("new york", "islanders2"))); + Arrays.asList(new Tuple2<>("california", "dodgers1"), + new Tuple2<>("california", "dodgers2"), + new Tuple2<>("california", "giants1"), + new Tuple2<>("california", "giants2"), + new Tuple2<>("new york", "yankees1"), + new Tuple2<>("new york", "yankees2"), + new Tuple2<>("new york", "mets1"), + new Tuple2<>("new york", "mets2")), + Arrays.asList(new Tuple2<>("california", "sharks1"), + new Tuple2<>("california", "sharks2"), + new Tuple2<>("california", "ducks1"), + new Tuple2<>("california", "ducks2"), + new Tuple2<>("new york", "rangers1"), + new Tuple2<>("new york", "rangers2"), + new Tuple2<>("new york", "islanders1"), + new Tuple2<>("new york", "islanders2"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> { - List<String> out = new ArrayList<String>(); - out.add(in + "1"); - out.add(in + "2"); - return out; - }); + JavaPairDStream<String, String> flatMapped = + pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2")); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); |