diff options
author | Sean Owen <sowen@cloudera.com> | 2014-06-04 11:27:08 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-06-04 11:27:08 -0700 |
commit | d341b17c2a0a4fce04045e13fb4a3b0621296320 (patch) | |
tree | dc40ffa1dfa298b473d5922ffdf1d736ad933b89 /extras | |
parent | b8d25800393d0208a76813bcd94509ac24a3add5 (diff) | |
download | spark-d341b17c2a0a4fce04045e13fb4a3b0621296320.tar.gz spark-d341b17c2a0a4fce04045e13fb4a3b0621296320.tar.bz2 spark-d341b17c2a0a4fce04045e13fb4a3b0621296320.zip |
SPARK-1973. Add randomSplit to JavaRDD (with tests, and tidy Java tests)
I'd like to use randomSplit through the Java API, and would like to add a convenience wrapper for this method to JavaRDD. This is fairly trivial. (In fact, is the intent that JavaRDD not wrap every RDD method? and that sometimes users should just use JavaRDD.wrapRDD()?)
Along the way, I added tests for it, and also touched up the Java API test style and behavior. This is maybe the more useful part of this small change.
Author: Sean Owen <sowen@cloudera.com>
Author: Xiangrui Meng <meng@databricks.com>
This patch had conflicts when merged, resolved by
Committer: Xiangrui Meng <meng@databricks.com>
Closes #919 from srowen/SPARK-1973 and squashes the following commits:
148cb7b [Sean Owen] Some final Java test polish, while we are at it
1fc3f3e [Xiangrui Meng] more cleaning on Java 8 tests
9ebc57f [Sean Owen] Use accumulator instead of temp files to test foreach
5efb0be [Sean Owen] Add Java randomSplit, and unit tests (including for sample)
5dcc158 [Sean Owen] Simplified Java 8 test with new language features, and fixed the name of MLB's greatest team
91a1769 [Sean Owen] Touch up minor style issues in existing Java API suite test
Diffstat (limited to 'extras')
-rw-r--r-- | extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java | 96 | ||||
-rw-r--r-- | extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java | 381 |
2 files changed, 233 insertions, 244 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index c366c10b15..729bc0459c 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -99,16 +99,16 @@ public class Java8APISuite implements Serializable { @Test public void leftOuterJoin() { JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(1, 2), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(3, 1) + new Tuple2<>(1, 1), + new Tuple2<>(1, 2), + new Tuple2<>(2, 1), + new Tuple2<>(3, 1) )); JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( - new Tuple2<Integer, Character>(1, 'x'), - new Tuple2<Integer, Character>(2, 'y'), - new Tuple2<Integer, Character>(2, 'z'), - new Tuple2<Integer, Character>(4, 'w') + new Tuple2<>(1, 'x'), + new Tuple2<>(2, 'y'), + new Tuple2<>(2, 'z'), + new Tuple2<>(4, 'w') )); List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined = rdd1.leftOuterJoin(rdd2).collect(); @@ -133,11 +133,11 @@ public class Java8APISuite implements Serializable { @Test public void foldByKey() { List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(3, 2), - new Tuple2<Integer, Integer>(3, 1) + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) ); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b); @@ -149,11 +149,11 @@ public class Java8APISuite implements Serializable { @Test public void reduceByKey() { List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(3, 2), - new Tuple2<Integer, Integer>(3, 1) + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) ); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b); @@ -177,7 +177,7 @@ public class Java8APISuite implements Serializable { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache(); doubles.collect(); - JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x)) + JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) .cache(); pairs.collect(); JavaRDD<String> strings = rdd.map(x -> x.toString()).cache(); @@ -194,31 +194,31 @@ public class Java8APISuite implements Serializable { Assert.assertEquals(11, words.count()); JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> { - List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>(); - for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word)); + List<Tuple2<String, String>> pairs2 = new LinkedList<>(); + for (String word : s.split(" ")) pairs2.add(new Tuple2<>(word, word)); return pairs2; }); - Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first()); + Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first()); Assert.assertEquals(11, pairs.count()); JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { - List<Double> lengths = new LinkedList<Double>(); + List<Double> lengths = new LinkedList<>(); for (String word : s.split(" ")) lengths.add(word.length() * 1.0); return lengths; }); Double x = doubles.first(); - Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01); + Assert.assertEquals(5.0, doubles.first(), 0.01); Assert.assertEquals(11, pairs.count()); } @Test public void mapsFromPairsToPairs() { List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); @@ -251,19 +251,18 @@ public class Java8APISuite implements Serializable { tempDir.deleteOnExit(); String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(pair -> - new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()))) + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); // Try reading the output back as an object file JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class) - .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString())); + .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); Assert.assertEquals(pairs, readRDD.collect()); Utils.deleteRecursively(tempDir); } @@ -325,7 +324,7 @@ public class Java8APISuite implements Serializable { } }; - final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam); + final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); rdd.foreach(x -> floatAccum.add((float) x)); Assert.assertEquals((Float) 25.0f, floatAccum.value()); @@ -338,22 +337,22 @@ public class Java8APISuite implements Serializable { public void keyBy() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect(); - Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0)); - Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1)); + Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); + Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); } @Test public void mapOnPairRDD() { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2)); + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); JavaPairRDD<Integer, Integer> rdd3 = - rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1())); + rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); Assert.assertEquals(Arrays.asList( new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(0, 2), - new Tuple2<Integer, Integer>(1, 3), - new Tuple2<Integer, Integer>(0, 4)), rdd3.collect()); + new Tuple2<>(0, 2), + new Tuple2<>(1, 3), + new Tuple2<>(0, 4)), rdd3.collect()); } @Test @@ -361,7 +360,7 @@ public class Java8APISuite implements Serializable { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2)); + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); List[] parts = rdd1.collectPartitions(new int[]{0}); Assert.assertEquals(Arrays.asList(1, 2), parts[0]); @@ -369,16 +368,13 @@ public class Java8APISuite implements Serializable { Assert.assertEquals(Arrays.asList(3, 4), parts[0]); Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(2, 0)), + Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), rdd2.collectPartitions(new int[]{0})[0]); parts = rdd2.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1), - new Tuple2<Integer, Integer>(4, 0)), parts[0]); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1), - new Tuple2<Integer, Integer>(6, 0), - new Tuple2<Integer, Integer>(7, 1)), parts[1]); + Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts[0]); + Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), + parts[1]); } @Test @@ -386,7 +382,7 @@ public class Java8APISuite implements Serializable { // Regression test for SPARK-1040 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1})); JavaPairRDD<Integer, int[]> pairRDD = - rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x})); + rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); pairRDD.collect(); // Works fine Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException } diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 43df0dea61..73091cfe2c 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -39,6 +39,7 @@ import org.apache.spark.streaming.api.java.JavaPairDStream; * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 * lambda syntax. */ +@SuppressWarnings("unchecked") public class Java8APISuite extends LocalJavaStreamingContext implements Serializable { @Test @@ -52,7 +53,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Arrays.asList(9, 4)); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> letterCount = stream.map(s -> s.length()); + JavaDStream<Integer> letterCount = stream.map(String::length); JavaTestUtils.attachTestOutputStream(letterCount); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -63,7 +64,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<String>> expected = Arrays.asList( Arrays.asList("giants"), @@ -81,11 +82,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testMapPartitions() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<String>> expected = Arrays.asList( Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOCKS")); + Arrays.asList("YANKEESRED SOX")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<String> mapped = stream.mapPartitions(in -> { @@ -172,7 +173,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); List<List<Tuple2<String, Integer>>> pairInputData = - Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); @@ -192,32 +193,32 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testTransformWith() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( Arrays.asList( - new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), + new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), Arrays.asList( - new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("new york", "rangers"))); + new Tuple2<>("california", "sharks"), + new Tuple2<>("new york", "rangers"))); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( Arrays.asList( - new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "mets")), + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "mets")), Arrays.asList( - new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "islanders"))); + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "islanders"))); - List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( Sets.newHashSet( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("dodgers", "giants")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("yankees", "mets"))), + new Tuple2<>("california", + new Tuple2<>("dodgers", "giants")), + new Tuple2<>("new york", + new Tuple2<>("yankees", "mets"))), Sets.newHashSet( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("sharks", "ducks")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("rangers", "islanders")))); + new Tuple2<>("california", + new Tuple2<>("sharks", "ducks")), + new Tuple2<>("new york", + new Tuple2<>("rangers", "islanders")))); JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( ssc, stringStringKVStream1, 1); @@ -232,7 +233,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaTestUtils.attachTestOutputStream(joined); List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); + List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); for (List<Tuple2<String, Tuple2<String, String>>> res : result) { unorderedResult.add(Sets.newHashSet(res)); } @@ -251,9 +252,9 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); List<List<Tuple2<String, Integer>>> pairInputData1 = - Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); List<List<Tuple2<Double, Character>>> pairInputData2 = - Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x'))); + Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( @@ -293,13 +294,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ ); List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( - Arrays.asList(new Tuple2<Integer, String>(1, "x")), - Arrays.asList(new Tuple2<Integer, String>(2, "y")) + Arrays.asList(new Tuple2<>(1, "x")), + Arrays.asList(new Tuple2<>(2, "y")) ); List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))), - Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y"))) + Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), + Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) ); JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); @@ -312,7 +313,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ // This is just to test whether this transform to JavaStream compiles JavaDStream<Long> transformed1 = ssc.transform( listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - assert (listOfRDDs.size() == 2); + Assert.assertEquals(2, listOfRDDs.size()); return null; }); @@ -321,13 +322,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - assert (listOfRDDs.size() == 3); + Assert.assertEquals(3, listOfRDDs.size()); JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0); JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1); JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2); JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); PairFunction<Integer, Integer, Integer> mapToTuple = - (Integer i) -> new Tuple2<Integer, Integer>(i, i); + (Integer i) -> new Tuple2<>(i, i); return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); }); JavaTestUtils.attachTestOutputStream(transformed2); @@ -365,36 +366,36 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(6, "g"), - new Tuple2<Integer, String>(6, "i"), - new Tuple2<Integer, String>(6, "a"), - new Tuple2<Integer, String>(6, "n"), - new Tuple2<Integer, String>(6, "t"), - new Tuple2<Integer, String>(6, "s")), + new Tuple2<>(6, "g"), + new Tuple2<>(6, "i"), + new Tuple2<>(6, "a"), + new Tuple2<>(6, "n"), + new Tuple2<>(6, "t"), + new Tuple2<>(6, "s")), Arrays.asList( - new Tuple2<Integer, String>(7, "d"), - new Tuple2<Integer, String>(7, "o"), - new Tuple2<Integer, String>(7, "d"), - new Tuple2<Integer, String>(7, "g"), - new Tuple2<Integer, String>(7, "e"), - new Tuple2<Integer, String>(7, "r"), - new Tuple2<Integer, String>(7, "s")), + new Tuple2<>(7, "d"), + new Tuple2<>(7, "o"), + new Tuple2<>(7, "d"), + new Tuple2<>(7, "g"), + new Tuple2<>(7, "e"), + new Tuple2<>(7, "r"), + new Tuple2<>(7, "s")), Arrays.asList( - new Tuple2<Integer, String>(9, "a"), - new Tuple2<Integer, String>(9, "t"), - new Tuple2<Integer, String>(9, "h"), - new Tuple2<Integer, String>(9, "l"), - new Tuple2<Integer, String>(9, "e"), - new Tuple2<Integer, String>(9, "t"), - new Tuple2<Integer, String>(9, "i"), - new Tuple2<Integer, String>(9, "c"), - new Tuple2<Integer, String>(9, "s"))); + new Tuple2<>(9, "a"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "h"), + new Tuple2<>(9, "l"), + new Tuple2<>(9, "e"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "i"), + new Tuple2<>(9, "c"), + new Tuple2<>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> { List<Tuple2<Integer, String>> out = Lists.newArrayList(); for (String letter : s.split("(?!^)")) { - out.add(new Tuple2<Integer, String>(s.length(), letter)); + out.add(new Tuple2<>(s.length(), letter)); } return out; }); @@ -411,12 +412,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ */ public static <T extends Comparable<T>> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { - for (List<T> list : expected) { - Collections.sort(list); - } - for (List<T> list : actual) { - Collections.sort(list); - } + expected.forEach((List<T> list) -> Collections.sort(list)); + actual.forEach((List<T> list) -> Collections.sort(list)); Assert.assertEquals(expected, actual); } @@ -424,11 +421,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testPairFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("giants", 6)), - Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); + Arrays.asList(new Tuple2<>("giants", 6)), + Arrays.asList(new Tuple2<>("yankees", 7))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = @@ -441,26 +438,26 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ } List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "yankees"), - new Tuple2<String, String>("new york", "mets")), - Arrays.asList(new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "rangers"), - new Tuple2<String, String>("new york", "islanders"))); + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "yankees"), + new Tuple2<>("new york", "mets")), + Arrays.asList(new Tuple2<>("california", "sharks"), + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "rangers"), + new Tuple2<>("new york", "islanders"))); List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 1), - new Tuple2<String, Integer>("california", 3), - new Tuple2<String, Integer>("new york", 4), - new Tuple2<String, Integer>("new york", 1)), + new Tuple2<>("california", 1), + new Tuple2<>("california", 3), + new Tuple2<>("new york", 4), + new Tuple2<>("new york", 1)), Arrays.asList( - new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("new york", 3), - new Tuple2<String, Integer>("new york", 1))); + new Tuple2<>("california", 5), + new Tuple2<>("california", 5), + new Tuple2<>("new york", 3), + new Tuple2<>("new york", 1))); @Test public void testPairMap() { // Maps pair -> pair of different type @@ -468,15 +465,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "california"), - new Tuple2<Integer, String>(3, "california"), - new Tuple2<Integer, String>(4, "new york"), - new Tuple2<Integer, String>(1, "new york")), + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), Arrays.asList( - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(3, "new york"), - new Tuple2<Integer, String>(1, "new york"))); + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -494,21 +491,21 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "california"), - new Tuple2<Integer, String>(3, "california"), - new Tuple2<Integer, String>(4, "new york"), - new Tuple2<Integer, String>(1, "new york")), + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), Arrays.asList( - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(3, "new york"), - new Tuple2<Integer, String>(1, "new york"))); + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> { - LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + LinkedList<Tuple2<Integer, String>> out = new LinkedList<>(); while (in.hasNext()) { Tuple2<String, Integer> next = in.next(); out.add(next.swap()); @@ -530,7 +527,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Arrays.asList(1, 3, 4, 1), Arrays.asList(5, 5, 3, 1)); - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); JavaTestUtils.attachTestOutputStream(reversed); @@ -543,31 +541,31 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("hi", 1), - new Tuple2<String, Integer>("ho", 2)), + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2)), Arrays.asList( - new Tuple2<String, Integer>("hi", 1), - new Tuple2<String, Integer>("ho", 2))); + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2))); List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "h"), - new Tuple2<Integer, String>(1, "i"), - new Tuple2<Integer, String>(2, "h"), - new Tuple2<Integer, String>(2, "o")), + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o")), Arrays.asList( - new Tuple2<Integer, String>(1, "h"), - new Tuple2<Integer, String>(1, "i"), - new Tuple2<Integer, String>(2, "h"), - new Tuple2<Integer, String>(2, "o"))); + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> { - List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + List<Tuple2<Integer, String>> out = new LinkedList<>(); for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<Integer, String>(in._2(), s.toString())); + out.add(new Tuple2<>(in._2(), s.toString())); } return out; }); @@ -584,11 +582,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), Arrays.asList( - new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -608,11 +606,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), Arrays.asList( - new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -632,12 +630,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -656,12 +654,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -689,12 +687,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -713,27 +711,27 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testPairTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(2, 5)), + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(1, 5))); + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5)), + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5))); + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5))); JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -751,15 +749,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ public void testPairToNormalRDDTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(2, 5)), + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(1, 5))); + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); List<List<Integer>> expected = Arrays.asList( Arrays.asList(3, 1, 4, 2), @@ -780,20 +778,20 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, String>>> inputData = stringStringKVStream; List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "DODGERS"), - new Tuple2<String, String>("california", "GIANTS"), - new Tuple2<String, String>("new york", "YANKEES"), - new Tuple2<String, String>("new york", "METS")), - Arrays.asList(new Tuple2<String, String>("california", "SHARKS"), - new Tuple2<String, String>("california", "DUCKS"), - new Tuple2<String, String>("new york", "RANGERS"), - new Tuple2<String, String>("new york", "ISLANDERS"))); + Arrays.asList(new Tuple2<>("california", "DODGERS"), + new Tuple2<>("california", "GIANTS"), + new Tuple2<>("new york", "YANKEES"), + new Tuple2<>("new york", "METS")), + Arrays.asList(new Tuple2<>("california", "SHARKS"), + new Tuple2<>("california", "DUCKS"), + new Tuple2<>("new york", "RANGERS"), + new Tuple2<>("new york", "ISLANDERS"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase()); + JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase); JavaTestUtils.attachTestOutputStream(mapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -805,34 +803,29 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ List<List<Tuple2<String, String>>> inputData = stringStringKVStream; List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers1"), - new Tuple2<String, String>("california", "dodgers2"), - new Tuple2<String, String>("california", "giants1"), - new Tuple2<String, String>("california", "giants2"), - new Tuple2<String, String>("new york", "yankees1"), - new Tuple2<String, String>("new york", "yankees2"), - new Tuple2<String, String>("new york", "mets1"), - new Tuple2<String, String>("new york", "mets2")), - Arrays.asList(new Tuple2<String, String>("california", "sharks1"), - new Tuple2<String, String>("california", "sharks2"), - new Tuple2<String, String>("california", "ducks1"), - new Tuple2<String, String>("california", "ducks2"), - new Tuple2<String, String>("new york", "rangers1"), - new Tuple2<String, String>("new york", "rangers2"), - new Tuple2<String, String>("new york", "islanders1"), - new Tuple2<String, String>("new york", "islanders2"))); + Arrays.asList(new Tuple2<>("california", "dodgers1"), + new Tuple2<>("california", "dodgers2"), + new Tuple2<>("california", "giants1"), + new Tuple2<>("california", "giants2"), + new Tuple2<>("new york", "yankees1"), + new Tuple2<>("new york", "yankees2"), + new Tuple2<>("new york", "mets1"), + new Tuple2<>("new york", "mets2")), + Arrays.asList(new Tuple2<>("california", "sharks1"), + new Tuple2<>("california", "sharks2"), + new Tuple2<>("california", "ducks1"), + new Tuple2<>("california", "ducks2"), + new Tuple2<>("new york", "rangers1"), + new Tuple2<>("new york", "rangers2"), + new Tuple2<>("new york", "islanders1"), + new Tuple2<>("new york", "islanders2"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> { - List<String> out = new ArrayList<String>(); - out.add(in + "1"); - out.add(in + "2"); - return out; - }); + JavaPairDStream<String, String> flatMapped = + pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2")); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); |