diff options
author | Sean Owen <sowen@cloudera.com> | 2015-09-12 10:40:10 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-09-12 10:40:10 +0100 |
commit | 22730ad54d681ad30e63fe910e8d89360853177d (patch) | |
tree | 81194034499a6d391a0949e865fc0aa6dd5fc4ec | |
parent | 8285e3b0d3dc0eff669eba993742dfe0401116f9 (diff) | |
download | spark-22730ad54d681ad30e63fe910e8d89360853177d.tar.gz spark-22730ad54d681ad30e63fe910e8d89360853177d.tar.bz2 spark-22730ad54d681ad30e63fe910e8d89360853177d.zip |
[SPARK-10547] [TEST] Streamline / improve style of Java API tests
Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order
Author: Sean Owen <sowen@cloudera.com>
Closes #8706 from srowen/SPARK-10547.
15 files changed, 755 insertions, 761 deletions
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index ebd3d61ae7..fd8f7f39b7 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -90,7 +90,7 @@ public class JavaAPISuite implements Serializable { JavaRDD<String> sUnion = sc.union(s1, s2); Assert.assertEquals(4, sUnion.count()); // List - List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>(); + List<JavaRDD<String>> list = new ArrayList<>(); list.add(s2); sUnion = sc.union(s1, list); Assert.assertEquals(4, sUnion.count()); @@ -103,9 +103,9 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(4, dUnion.count()); // Union of JavaPairRDDs - List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); - pairs.add(new Tuple2<Integer, Integer>(1, 2)); - pairs.add(new Tuple2<Integer, Integer>(3, 4)); + List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); + pairs.add(new Tuple2<>(1, 2)); + pairs.add(new Tuple2<>(3, 4)); JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2); @@ -133,9 +133,9 @@ public class JavaAPISuite implements Serializable { JavaDoubleRDD dIntersection = d1.intersection(d2); Assert.assertEquals(2, dIntersection.count()); - List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); - pairs.add(new Tuple2<Integer, Integer>(1, 2)); - pairs.add(new Tuple2<Integer, Integer>(3, 4)); + List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); + pairs.add(new Tuple2<>(1, 2)); + pairs.add(new Tuple2<>(3, 4)); JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> pIntersection = p1.intersection(p2); @@ -165,47 +165,49 @@ public class JavaAPISuite implements Serializable { @Test public void sortByKey() { - List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); - pairs.add(new Tuple2<Integer, Integer>(0, 4)); - pairs.add(new Tuple2<Integer, Integer>(3, 2)); - pairs.add(new Tuple2<Integer, Integer>(-1, 1)); + List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); + pairs.add(new Tuple2<>(0, 4)); + pairs.add(new Tuple2<>(3, 2)); + pairs.add(new Tuple2<>(-1, 1)); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); // Default comparator JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey(); - Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first()); + Assert.assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect(); - Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1)); - Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2)); + Assert.assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1)); + Assert.assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); // Custom comparator sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false); - Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first()); + Assert.assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); sortedPairs = sortedRDD.collect(); - Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1)); - Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2)); + Assert.assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1)); + Assert.assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); } @SuppressWarnings("unchecked") @Test public void repartitionAndSortWithinPartitions() { - List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); - pairs.add(new Tuple2<Integer, Integer>(0, 5)); - pairs.add(new Tuple2<Integer, Integer>(3, 8)); - pairs.add(new Tuple2<Integer, Integer>(2, 6)); - pairs.add(new Tuple2<Integer, Integer>(0, 8)); - pairs.add(new Tuple2<Integer, Integer>(3, 8)); - pairs.add(new Tuple2<Integer, Integer>(1, 3)); + List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); + pairs.add(new Tuple2<>(0, 5)); + pairs.add(new Tuple2<>(3, 8)); + pairs.add(new Tuple2<>(2, 6)); + pairs.add(new Tuple2<>(0, 8)); + pairs.add(new Tuple2<>(3, 8)); + pairs.add(new Tuple2<>(1, 3)); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); Partitioner partitioner = new Partitioner() { + @Override public int numPartitions() { return 2; } + @Override public int getPartition(Object key) { - return ((Integer)key).intValue() % 2; + return (Integer) key % 2; } }; @@ -214,10 +216,10 @@ public class JavaAPISuite implements Serializable { Assert.assertTrue(repartitioned.partitioner().isPresent()); Assert.assertEquals(repartitioned.partitioner().get(), partitioner); List<List<Tuple2<Integer, Integer>>> partitions = repartitioned.glom().collect(); - Assert.assertEquals(partitions.get(0), Arrays.asList(new Tuple2<Integer, Integer>(0, 5), - new Tuple2<Integer, Integer>(0, 8), new Tuple2<Integer, Integer>(2, 6))); - Assert.assertEquals(partitions.get(1), Arrays.asList(new Tuple2<Integer, Integer>(1, 3), - new Tuple2<Integer, Integer>(3, 8), new Tuple2<Integer, Integer>(3, 8))); + Assert.assertEquals(partitions.get(0), + Arrays.asList(new Tuple2<>(0, 5), new Tuple2<>(0, 8), new Tuple2<>(2, 6))); + Assert.assertEquals(partitions.get(1), + Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8))); } @Test @@ -228,35 +230,37 @@ public class JavaAPISuite implements Serializable { @Test public void sortBy() { - List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); - pairs.add(new Tuple2<Integer, Integer>(0, 4)); - pairs.add(new Tuple2<Integer, Integer>(3, 2)); - pairs.add(new Tuple2<Integer, Integer>(-1, 1)); + List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); + pairs.add(new Tuple2<>(0, 4)); + pairs.add(new Tuple2<>(3, 2)); + pairs.add(new Tuple2<>(-1, 1)); JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs); // compare on first value JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() { - public Integer call(Tuple2<Integer, Integer> t) throws Exception { + @Override + public Integer call(Tuple2<Integer, Integer> t) { return t._1(); } }, true, 2); - Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first()); + Assert.assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect(); - Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1)); - Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2)); + Assert.assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1)); + Assert.assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); // compare on second value sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() { - public Integer call(Tuple2<Integer, Integer> t) throws Exception { + @Override + public Integer call(Tuple2<Integer, Integer> t) { return t._2(); } }, true, 2); - Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first()); + Assert.assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); sortedPairs = sortedRDD.collect(); - Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(1)); - Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(2)); + Assert.assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1)); + Assert.assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2)); } @Test @@ -265,7 +269,7 @@ public class JavaAPISuite implements Serializable { JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); rdd.foreach(new VoidFunction<String>() { @Override - public void call(String s) throws IOException { + public void call(String s) { accum.add(1); } }); @@ -278,7 +282,7 @@ public class JavaAPISuite implements Serializable { JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); rdd.foreachPartition(new VoidFunction<Iterator<String>>() { @Override - public void call(Iterator<String> iter) throws IOException { + public void call(Iterator<String> iter) { while (iter.hasNext()) { iter.next(); accum.add(1); @@ -301,7 +305,7 @@ public class JavaAPISuite implements Serializable { List<Integer> dataArray = Arrays.asList(1, 2, 3, 4); JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId(); JavaRDD<Long> indexes = zip.values(); - Assert.assertEquals(4, new HashSet<Long>(indexes.collect()).size()); + Assert.assertEquals(4, new HashSet<>(indexes.collect()).size()); } @Test @@ -317,10 +321,10 @@ public class JavaAPISuite implements Serializable { @Test public void lookup() { JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, String>("Apples", "Fruit"), - new Tuple2<String, String>("Oranges", "Fruit"), - new Tuple2<String, String>("Oranges", "Citrus") - )); + new Tuple2<>("Apples", "Fruit"), + new Tuple2<>("Oranges", "Fruit"), + new Tuple2<>("Oranges", "Citrus") + )); Assert.assertEquals(2, categories.lookup("Oranges").size()); Assert.assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0))); } @@ -390,18 +394,17 @@ public class JavaAPISuite implements Serializable { @Test public void cogroup() { JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, String>("Apples", "Fruit"), - new Tuple2<String, String>("Oranges", "Fruit"), - new Tuple2<String, String>("Oranges", "Citrus") + new Tuple2<>("Apples", "Fruit"), + new Tuple2<>("Oranges", "Fruit"), + new Tuple2<>("Oranges", "Citrus") )); JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, Integer>("Oranges", 2), - new Tuple2<String, Integer>("Apples", 3) + new Tuple2<>("Oranges", 2), + new Tuple2<>("Apples", 3) )); JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = categories.cogroup(prices); - Assert.assertEquals("[Fruit, Citrus]", - Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); + Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); cogrouped.collect(); @@ -411,23 +414,22 @@ public class JavaAPISuite implements Serializable { @Test public void cogroup3() { JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, String>("Apples", "Fruit"), - new Tuple2<String, String>("Oranges", "Fruit"), - new Tuple2<String, String>("Oranges", "Citrus") + new Tuple2<>("Apples", "Fruit"), + new Tuple2<>("Oranges", "Fruit"), + new Tuple2<>("Oranges", "Citrus") )); JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, Integer>("Oranges", 2), - new Tuple2<String, Integer>("Apples", 3) + new Tuple2<>("Oranges", 2), + new Tuple2<>("Apples", 3) )); JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, Integer>("Oranges", 21), - new Tuple2<String, Integer>("Apples", 42) + new Tuple2<>("Oranges", 21), + new Tuple2<>("Apples", 42) )); JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped = categories.cogroup(prices, quantities); - Assert.assertEquals("[Fruit, Citrus]", - Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); + Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3())); @@ -439,27 +441,26 @@ public class JavaAPISuite implements Serializable { @Test public void cogroup4() { JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, String>("Apples", "Fruit"), - new Tuple2<String, String>("Oranges", "Fruit"), - new Tuple2<String, String>("Oranges", "Citrus") + new Tuple2<>("Apples", "Fruit"), + new Tuple2<>("Oranges", "Fruit"), + new Tuple2<>("Oranges", "Citrus") )); JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, Integer>("Oranges", 2), - new Tuple2<String, Integer>("Apples", 3) + new Tuple2<>("Oranges", 2), + new Tuple2<>("Apples", 3) )); JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, Integer>("Oranges", 21), - new Tuple2<String, Integer>("Apples", 42) + new Tuple2<>("Oranges", 21), + new Tuple2<>("Apples", 42) )); JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, String>("Oranges", "BR"), - new Tuple2<String, String>("Apples", "US") + new Tuple2<>("Oranges", "BR"), + new Tuple2<>("Apples", "US") )); JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>, Iterable<String>>> cogrouped = categories.cogroup(prices, quantities, countries); - Assert.assertEquals("[Fruit, Citrus]", - Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); + Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3())); Assert.assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4())); @@ -471,16 +472,16 @@ public class JavaAPISuite implements Serializable { @Test public void leftOuterJoin() { JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(1, 2), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(3, 1) + new Tuple2<>(1, 1), + new Tuple2<>(1, 2), + new Tuple2<>(2, 1), + new Tuple2<>(3, 1) )); JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( - new Tuple2<Integer, Character>(1, 'x'), - new Tuple2<Integer, Character>(2, 'y'), - new Tuple2<Integer, Character>(2, 'z'), - new Tuple2<Integer, Character>(4, 'w') + new Tuple2<>(1, 'x'), + new Tuple2<>(2, 'y'), + new Tuple2<>(2, 'z'), + new Tuple2<>(4, 'w') )); List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined = rdd1.leftOuterJoin(rdd2).collect(); @@ -548,11 +549,11 @@ public class JavaAPISuite implements Serializable { public void aggregateByKey() { JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs( Arrays.asList( - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(3, 2), - new Tuple2<Integer, Integer>(5, 1), - new Tuple2<Integer, Integer>(5, 3)), 2); + new Tuple2<>(1, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(5, 1), + new Tuple2<>(5, 3)), 2); Map<Integer, Set<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(), new Function2<Set<Integer>, Integer, Set<Integer>>() { @@ -570,20 +571,20 @@ public class JavaAPISuite implements Serializable { } }).collectAsMap(); Assert.assertEquals(3, sets.size()); - Assert.assertEquals(new HashSet<Integer>(Arrays.asList(1)), sets.get(1)); - Assert.assertEquals(new HashSet<Integer>(Arrays.asList(2)), sets.get(3)); - Assert.assertEquals(new HashSet<Integer>(Arrays.asList(1, 3)), sets.get(5)); + Assert.assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1)); + Assert.assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3)); + Assert.assertEquals(new HashSet<>(Arrays.asList(1, 3)), sets.get(5)); } @SuppressWarnings("unchecked") @Test public void foldByKey() { List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(3, 2), - new Tuple2<Integer, Integer>(3, 1) + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) ); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, @@ -602,11 +603,11 @@ public class JavaAPISuite implements Serializable { @Test public void reduceByKey() { List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(3, 2), - new Tuple2<Integer, Integer>(3, 1) + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) ); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey( @@ -690,7 +691,7 @@ public class JavaAPISuite implements Serializable { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World")); JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD); - Assert.assertEquals(new Tuple2<String, Double>("Hello", 1.0), cartesian.first()); + Assert.assertEquals(new Tuple2<>("Hello", 1.0), cartesian.first()); } @Test @@ -743,6 +744,7 @@ public class JavaAPISuite implements Serializable { } private static class DoubleComparator implements Comparator<Double>, Serializable { + @Override public int compare(Double o1, Double o2) { return o1.compareTo(o2); } @@ -766,14 +768,14 @@ public class JavaAPISuite implements Serializable { public void naturalMax() { JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); double max = rdd.max(); - Assert.assertTrue(4.0 == max); + Assert.assertEquals(4.0, max, 0.0); } @Test public void naturalMin() { JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); double max = rdd.min(); - Assert.assertTrue(1.0 == max); + Assert.assertEquals(1.0, max, 0.0); } @Test @@ -809,7 +811,7 @@ public class JavaAPISuite implements Serializable { JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); double sum = rdd.reduce(new Function2<Double, Double, Double>() { @Override - public Double call(Double v1, Double v2) throws Exception { + public Double call(Double v1, Double v2) { return v1 + v2; } }); @@ -844,7 +846,7 @@ public class JavaAPISuite implements Serializable { new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer x) { - return new Tuple2<Integer, Integer>(x, x); + return new Tuple2<>(x, x); } }).cache(); pairs.collect(); @@ -870,26 +872,25 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals("Hello", words.first()); Assert.assertEquals(11, words.count()); - JavaPairRDD<String, String> pairs = rdd.flatMapToPair( + JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair( new PairFlatMapFunction<String, String, String>() { - @Override public Iterable<Tuple2<String, String>> call(String s) { - List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>(); + List<Tuple2<String, String>> pairs = new LinkedList<>(); for (String word : s.split(" ")) { - pairs.add(new Tuple2<String, String>(word, word)); + pairs.add(new Tuple2<>(word, word)); } return pairs; } } ); - Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first()); - Assert.assertEquals(11, pairs.count()); + Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first()); + Assert.assertEquals(11, pairsRDD.count()); JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() { @Override public Iterable<Double> call(String s) { - List<Double> lengths = new LinkedList<Double>(); + List<Double> lengths = new LinkedList<>(); for (String word : s.split(" ")) { lengths.add((double) word.length()); } @@ -897,36 +898,36 @@ public class JavaAPISuite implements Serializable { } }); Assert.assertEquals(5.0, doubles.first(), 0.01); - Assert.assertEquals(11, pairs.count()); + Assert.assertEquals(11, pairsRDD.count()); } @SuppressWarnings("unchecked") @Test public void mapsFromPairsToPairs() { - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") - ); - JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); - - // Regression test for SPARK-668: - JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair( - new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() { - @Override - public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) { - return Collections.singletonList(item.swap()); - } + List<Tuple2<Integer, String>> pairs = Arrays.asList( + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") + ); + JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); + + // Regression test for SPARK-668: + JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair( + new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() { + @Override + public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) { + return Collections.singletonList(item.swap()); + } }); - swapped.collect(); + swapped.collect(); - // There was never a bug here, but it's worth testing: - pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { - @Override - public Tuple2<String, Integer> call(Tuple2<Integer, String> item) { - return item.swap(); - } - }).collect(); + // There was never a bug here, but it's worth testing: + pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { + @Override + public Tuple2<String, Integer> call(Tuple2<Integer, String> item) { + return item.swap(); + } + }).collect(); } @Test @@ -953,7 +954,7 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex( new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() { @Override - public Iterator<Integer> call(Integer index, Iterator<Integer> iter) throws Exception { + public Iterator<Integer> call(Integer index, Iterator<Integer> iter) { int sum = 0; while (iter.hasNext()) { sum += iter.next(); @@ -972,8 +973,8 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> repartitioned1 = in1.repartition(4); List<List<Integer>> result1 = repartitioned1.glom().collect(); Assert.assertEquals(4, result1.size()); - for (List<Integer> l: result1) { - Assert.assertTrue(l.size() > 0); + for (List<Integer> l : result1) { + Assert.assertFalse(l.isEmpty()); } // Growing number of partitions @@ -982,7 +983,7 @@ public class JavaAPISuite implements Serializable { List<List<Integer>> result2 = repartitioned2.glom().collect(); Assert.assertEquals(2, result2.size()); for (List<Integer> l: result2) { - Assert.assertTrue(l.size() > 0); + Assert.assertFalse(l.isEmpty()); } } @@ -994,9 +995,9 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(20, doubleRDD.sum(), 0.1); List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY()); @@ -1046,7 +1047,7 @@ public class JavaAPISuite implements Serializable { Files.write(content1, new File(tempDirName + "/part-00000")); Files.write(content2, new File(tempDirName + "/part-00001")); - Map<String, String> container = new HashMap<String, String>(); + Map<String, String> container = new HashMap<>(); container.put(tempDirName+"/part-00000", new Text(content1).toString()); container.put(tempDirName+"/part-00001", new Text(content2).toString()); @@ -1075,16 +1076,16 @@ public class JavaAPISuite implements Serializable { public void sequenceFile() { String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { @Override public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); + return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); } }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); @@ -1093,7 +1094,7 @@ public class JavaAPISuite implements Serializable { Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) { - return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()); + return new Tuple2<>(pair._1().get(), pair._2().toString()); } }); Assert.assertEquals(pairs, readRDD.collect()); @@ -1110,7 +1111,7 @@ public class JavaAPISuite implements Serializable { FileOutputStream fos1 = new FileOutputStream(file1); FileChannel channel1 = fos1.getChannel(); - ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1); + ByteBuffer bbuf = ByteBuffer.wrap(content1); channel1.write(bbuf); channel1.close(); JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName, 3); @@ -1131,14 +1132,14 @@ public class JavaAPISuite implements Serializable { FileOutputStream fos1 = new FileOutputStream(file1); FileChannel channel1 = fos1.getChannel(); - ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1); + ByteBuffer bbuf = ByteBuffer.wrap(content1); channel1.write(bbuf); channel1.close(); JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache(); readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() { @Override - public void call(Tuple2<String, PortableDataStream> pair) throws Exception { + public void call(Tuple2<String, PortableDataStream> pair) { pair._2().toArray(); // force the file to read } }); @@ -1162,7 +1163,7 @@ public class JavaAPISuite implements Serializable { FileChannel channel1 = fos1.getChannel(); for (int i = 0; i < numOfCopies; i++) { - ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1); + ByteBuffer bbuf = ByteBuffer.wrap(content1); channel1.write(bbuf); } channel1.close(); @@ -1180,24 +1181,23 @@ public class JavaAPISuite implements Serializable { public void writeWithNewAPIHadoopFile() { String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { @Override public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); + return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); } - }).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class, - org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); + }).saveAsNewAPIHadoopFile( + outputDir, IntWritable.class, Text.class, + org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); - JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class, - Text.class); - Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, - String>() { + JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class, Text.class); + Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { @Override public String call(Tuple2<IntWritable, Text> x) { return x.toString(); @@ -1210,24 +1210,23 @@ public class JavaAPISuite implements Serializable { public void readWithNewAPIHadoopFile() throws IOException { String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { @Override public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); + return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); } }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir, - org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, - Text.class, new Job().getConfiguration()); - Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, - String>() { + org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, + IntWritable.class, Text.class, new Job().getConfiguration()); + Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { @Override public String call(Tuple2<IntWritable, Text> x) { return x.toString(); @@ -1251,9 +1250,9 @@ public class JavaAPISuite implements Serializable { public void objectFilesOfComplexTypes() { String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.saveAsObjectFile(outputDir); @@ -1267,23 +1266,22 @@ public class JavaAPISuite implements Serializable { public void hadoopFile() { String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { @Override public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); + return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); } }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, - SequenceFileInputFormat.class, IntWritable.class, Text.class); - Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, - String>() { + SequenceFileInputFormat.class, IntWritable.class, Text.class); + Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { @Override public String call(Tuple2<IntWritable, Text> x) { return x.toString(); @@ -1296,16 +1294,16 @@ public class JavaAPISuite implements Serializable { public void hadoopFileCompressed() { String outputDir = new File(tempDir, "output_compressed").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { @Override public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); + return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); } }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, DefaultCodec.class); @@ -1313,8 +1311,7 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, SequenceFileInputFormat.class, IntWritable.class, Text.class); - Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, - String>() { + Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { @Override public String call(Tuple2<IntWritable, Text> x) { return x.toString(); @@ -1414,8 +1411,8 @@ public class JavaAPISuite implements Serializable { return t.toString(); } }).collect(); - Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0)); - Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1)); + Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); + Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); } @Test @@ -1448,20 +1445,20 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); Function<Integer, Integer> keyFunction = new Function<Integer, Integer>() { @Override - public Integer call(Integer v1) throws Exception { + public Integer call(Integer v1) { return v1 % 3; } }; Function<Integer, Integer> createCombinerFunction = new Function<Integer, Integer>() { @Override - public Integer call(Integer v1) throws Exception { + public Integer call(Integer v1) { return v1; } }; Function2<Integer, Integer, Integer> mergeValueFunction = new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer v1, Integer v2) throws Exception { + public Integer call(Integer v1, Integer v2) { return v1 + v2; } }; @@ -1496,21 +1493,21 @@ public class JavaAPISuite implements Serializable { new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<Integer, Integer>(i, i % 2); + return new Tuple2<>(i, i % 2); } }); JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair( new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) { - return new Tuple2<Integer, Integer>(in._2(), in._1()); - } - }); + @Override + public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) { + return new Tuple2<>(in._2(), in._1()); + } + }); Assert.assertEquals(Arrays.asList( - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(0, 2), - new Tuple2<Integer, Integer>(1, 3), - new Tuple2<Integer, Integer>(0, 4)), rdd3.collect()); + new Tuple2<>(1, 1), + new Tuple2<>(0, 2), + new Tuple2<>(1, 3), + new Tuple2<>(0, 4)), rdd3.collect()); } @@ -1523,7 +1520,7 @@ public class JavaAPISuite implements Serializable { new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<Integer, Integer>(i, i % 2); + return new Tuple2<>(i, i % 2); } }); @@ -1534,23 +1531,23 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(Arrays.asList(3, 4), parts[0]); Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(2, 0)), + Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), + new Tuple2<>(2, 0)), rdd2.collectPartitions(new int[] {0})[0]); List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2}); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1), - new Tuple2<Integer, Integer>(4, 0)), + Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), + new Tuple2<>(4, 0)), parts2[0]); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1), - new Tuple2<Integer, Integer>(6, 0), - new Tuple2<Integer, Integer>(7, 1)), + Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), + new Tuple2<>(6, 0), + new Tuple2<>(7, 1)), parts2[1]); } @Test public void countApproxDistinct() { - List<Integer> arrayData = new ArrayList<Integer>(); + List<Integer> arrayData = new ArrayList<>(); int size = 100; for (int i = 0; i < 100000; i++) { arrayData.add(i % size); @@ -1561,15 +1558,15 @@ public class JavaAPISuite implements Serializable { @Test public void countApproxDistinctByKey() { - List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>(); + List<Tuple2<Integer, Integer>> arrayData = new ArrayList<>(); for (int i = 10; i < 100; i++) { for (int j = 0; j < i; j++) { - arrayData.add(new Tuple2<Integer, Integer>(i, j)); + arrayData.add(new Tuple2<>(i, j)); } } double relativeSD = 0.001; JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData); - List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(8, 0).collect(); + List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect(); for (Tuple2<Integer, Object> resItem : res) { double count = (double)resItem._1(); Long resCount = (Long)resItem._2(); @@ -1587,7 +1584,7 @@ public class JavaAPISuite implements Serializable { new PairFunction<Integer, Integer, int[]>() { @Override public Tuple2<Integer, int[]> call(Integer x) { - return new Tuple2<Integer, int[]>(x, new int[] { x }); + return new Tuple2<>(x, new int[]{x}); } }); pairRDD.collect(); // Works fine @@ -1598,7 +1595,7 @@ public class JavaAPISuite implements Serializable { @Test public void collectAsMapAndSerialize() throws Exception { JavaPairRDD<String,Integer> rdd = - sc.parallelizePairs(Arrays.asList(new Tuple2<String,Integer>("foo", 1))); + sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1))); Map<String,Integer> map = rdd.collectAsMap(); ByteArrayOutputStream bytes = new ByteArrayOutputStream(); new ObjectOutputStream(bytes).writeObject(map); @@ -1615,7 +1612,7 @@ public class JavaAPISuite implements Serializable { new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<Integer, Integer>(i % 2, 1); + return new Tuple2<>(i % 2, 1); } }); Map<Integer, Object> fractions = Maps.newHashMap(); @@ -1623,12 +1620,12 @@ public class JavaAPISuite implements Serializable { fractions.put(1, 1.0); JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L); Map<Integer, Long> wrCounts = (Map<Integer, Long>) (Object) wr.countByKey(); - Assert.assertTrue(wrCounts.size() == 2); + Assert.assertEquals(2, wrCounts.size()); Assert.assertTrue(wrCounts.get(0) > 0); Assert.assertTrue(wrCounts.get(1) > 0); JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L); Map<Integer, Long> worCounts = (Map<Integer, Long>) (Object) wor.countByKey(); - Assert.assertTrue(worCounts.size() == 2); + Assert.assertEquals(2, worCounts.size()); Assert.assertTrue(worCounts.get(0) > 0); Assert.assertTrue(worCounts.get(1) > 0); } @@ -1641,7 +1638,7 @@ public class JavaAPISuite implements Serializable { new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<Integer, Integer>(i % 2, 1); + return new Tuple2<>(i % 2, 1); } }); Map<Integer, Object> fractions = Maps.newHashMap(); @@ -1649,25 +1646,25 @@ public class JavaAPISuite implements Serializable { fractions.put(1, 1.0); JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L); Map<Integer, Long> wrExactCounts = (Map<Integer, Long>) (Object) wrExact.countByKey(); - Assert.assertTrue(wrExactCounts.size() == 2); + Assert.assertEquals(2, wrExactCounts.size()); Assert.assertTrue(wrExactCounts.get(0) == 2); Assert.assertTrue(wrExactCounts.get(1) == 4); JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L); Map<Integer, Long> worExactCounts = (Map<Integer, Long>) (Object) worExact.countByKey(); - Assert.assertTrue(worExactCounts.size() == 2); + Assert.assertEquals(2, worExactCounts.size()); Assert.assertTrue(worExactCounts.get(0) == 2); Assert.assertTrue(worExactCounts.get(1) == 4); } private static class SomeCustomClass implements Serializable { - public SomeCustomClass() { + SomeCustomClass() { // Intentionally left blank } } @Test public void collectUnderlyingScalaRDD() { - List<SomeCustomClass> data = new ArrayList<SomeCustomClass>(); + List<SomeCustomClass> data = new ArrayList<>(); for (int i = 0; i < 100; i++) { data.add(new SomeCustomClass()); } @@ -1679,7 +1676,7 @@ public class JavaAPISuite implements Serializable { private static final class BuggyMapFunction<T> implements Function<T, T> { @Override - public T call(T x) throws Exception { + public T call(T x) { throw new IllegalStateException("Custom exception!"); } } @@ -1716,7 +1713,7 @@ public class JavaAPISuite implements Serializable { JavaFutureAction<Void> future = rdd.foreachAsync( new VoidFunction<Integer>() { @Override - public void call(Integer integer) throws Exception { + public void call(Integer integer) { // intentionally left blank. } } @@ -1745,7 +1742,7 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> rdd = sc.parallelize(data, 1); JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() { @Override - public void call(Integer integer) throws Exception { + public void call(Integer integer) throws InterruptedException { Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled. } }); diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 9db07d0507..fbdfbf7e50 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -75,11 +75,11 @@ public class JavaDirectKafkaStreamSuite implements Serializable { String[] topic1data = createTopicAndSendData(topic1); String[] topic2data = createTopicAndSendData(topic2); - HashSet<String> sent = new HashSet<String>(); + Set<String> sent = new HashSet<>(); sent.addAll(Arrays.asList(topic1data)); sent.addAll(Arrays.asList(topic2data)); - HashMap<String, String> kafkaParams = new HashMap<String, String>(); + Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); kafkaParams.put("auto.offset.reset", "smallest"); @@ -95,17 +95,17 @@ public class JavaDirectKafkaStreamSuite implements Serializable { // Make sure you can get offset ranges from the rdd new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { @Override - public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { + public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); - Assert.assertEquals(offsets[0].topic(), topic1); + Assert.assertEquals(topic1, offsets[0].topic()); return rdd; } } ).map( new Function<Tuple2<String, String>, String>() { @Override - public String call(Tuple2<String, String> kv) throws Exception { + public String call(Tuple2<String, String> kv) { return kv._2(); } } @@ -119,10 +119,10 @@ public class JavaDirectKafkaStreamSuite implements Serializable { StringDecoder.class, String.class, kafkaParams, - topicOffsetToMap(topic2, (long) 0), + topicOffsetToMap(topic2, 0L), new Function<MessageAndMetadata<String, String>, String>() { @Override - public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception { + public String call(MessageAndMetadata<String, String> msgAndMd) { return msgAndMd.message(); } } @@ -133,7 +133,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { unifiedStream.foreachRDD( new Function<JavaRDD<String>, Void>() { @Override - public Void call(JavaRDD<String> rdd) throws Exception { + public Void call(JavaRDD<String> rdd) { result.addAll(rdd.collect()); for (OffsetRange o : offsetRanges.get()) { System.out.println( @@ -155,14 +155,14 @@ public class JavaDirectKafkaStreamSuite implements Serializable { ssc.stop(); } - private HashSet<String> topicToSet(String topic) { - HashSet<String> topicSet = new HashSet<String>(); + private static Set<String> topicToSet(String topic) { + Set<String> topicSet = new HashSet<>(); topicSet.add(topic); return topicSet; } - private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) { - HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>(); + private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) { + Map<TopicAndPartition, Long> topicMap = new HashMap<>(); topicMap.put(new TopicAndPartition(topic, 0), offsetToStart); return topicMap; } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index a9dc6e5061..afcc6cfccd 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka; import java.io.Serializable; import java.util.HashMap; +import java.util.Map; import scala.Tuple2; @@ -66,10 +67,10 @@ public class JavaKafkaRDDSuite implements Serializable { String topic1 = "topic1"; String topic2 = "topic2"; - String[] topic1data = createTopicAndSendData(topic1); - String[] topic2data = createTopicAndSendData(topic2); + createTopicAndSendData(topic1); + createTopicAndSendData(topic2); - HashMap<String, String> kafkaParams = new HashMap<String, String>(); + Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); OffsetRange[] offsetRanges = { @@ -77,8 +78,8 @@ public class JavaKafkaRDDSuite implements Serializable { OffsetRange.create(topic2, 0, 0, 1) }; - HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap<TopicAndPartition, Broker>(); - HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>(); + Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>(); + Map<TopicAndPartition, Broker> leaders = new HashMap<>(); String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":"); Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); leaders.put(new TopicAndPartition(topic1, 0), broker); @@ -95,7 +96,7 @@ public class JavaKafkaRDDSuite implements Serializable { ).map( new Function<Tuple2<String, String>, String>() { @Override - public String call(Tuple2<String, String> kv) throws Exception { + public String call(Tuple2<String, String> kv) { return kv._2(); } } @@ -113,7 +114,7 @@ public class JavaKafkaRDDSuite implements Serializable { emptyLeaders, new Function<MessageAndMetadata<String, String>, String>() { @Override - public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception { + public String call(MessageAndMetadata<String, String> msgAndMd) { return msgAndMd.message(); } } @@ -131,7 +132,7 @@ public class JavaKafkaRDDSuite implements Serializable { leaders, new Function<MessageAndMetadata<String, String>, String>() { @Override - public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception { + public String call(MessageAndMetadata<String, String> msgAndMd) { return msgAndMd.message(); } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index e4c659215b..1e69de46cd 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -67,10 +67,10 @@ public class JavaKafkaStreamSuite implements Serializable { @Test public void testKafkaStream() throws InterruptedException { String topic = "topic1"; - HashMap<String, Integer> topics = new HashMap<String, Integer>(); + Map<String, Integer> topics = new HashMap<>(); topics.put(topic, 1); - HashMap<String, Integer> sent = new HashMap<String, Integer>(); + Map<String, Integer> sent = new HashMap<>(); sent.put("a", 5); sent.put("b", 3); sent.put("c", 10); @@ -78,7 +78,7 @@ public class JavaKafkaStreamSuite implements Serializable { kafkaTestUtils.createTopic(topic); kafkaTestUtils.sendMessages(topic, sent); - HashMap<String, String> kafkaParams = new HashMap<String, String>(); + Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress()); kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000)); kafkaParams.put("auto.offset.reset", "smallest"); @@ -97,7 +97,7 @@ public class JavaKafkaStreamSuite implements Serializable { JavaDStream<String> words = stream.map( new Function<Tuple2<String, String>, String>() { @Override - public String call(Tuple2<String, String> tuple2) throws Exception { + public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } } @@ -106,7 +106,7 @@ public class JavaKafkaStreamSuite implements Serializable { words.countByValue().foreachRDD( new Function<JavaPairRDD<String, Long>, Void>() { @Override - public Void call(JavaPairRDD<String, Long> rdd) throws Exception { + public Void call(JavaPairRDD<String, Long> rdd) { List<Tuple2<String, Long>> ret = rdd.collect(); for (Tuple2<String, Long> r : ret) { if (result.containsKey(r._1())) { @@ -130,8 +130,8 @@ public class JavaKafkaStreamSuite implements Serializable { Thread.sleep(200); } Assert.assertEquals(sent.size(), result.size()); - for (String k : sent.keySet()) { - Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue()); + for (Map.Entry<String, Integer> e : sent.entrySet()) { + Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue()); } } } diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java index e46b4e5c75..26ec8af455 100644 --- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -17,8 +17,6 @@ package org.apache.spark.streaming.twitter; -import java.util.Arrays; - import org.junit.Test; import twitter4j.Status; import twitter4j.auth.Authorization; @@ -30,7 +28,7 @@ import org.apache.spark.streaming.api.java.JavaDStream; public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { @Test public void testTwitterStream() { - String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray(); + String[] filters = { "filter1", "filter2" }; Authorization auth = NullAuthorization.getInstance(); // tests the API, does not actually test data receiving diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index 729bc0459c..14975265ab 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -77,7 +77,7 @@ public class Java8APISuite implements Serializable { public void foreach() { foreachCalls = 0; JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach((x) -> foreachCalls++); + rdd.foreach(x -> foreachCalls++); Assert.assertEquals(2, foreachCalls); } @@ -180,7 +180,7 @@ public class Java8APISuite implements Serializable { JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) .cache(); pairs.collect(); - JavaRDD<String> strings = rdd.map(x -> x.toString()).cache(); + JavaRDD<String> strings = rdd.map(Object::toString).cache(); strings.collect(); } @@ -195,7 +195,9 @@ public class Java8APISuite implements Serializable { JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> { List<Tuple2<String, String>> pairs2 = new LinkedList<>(); - for (String word : s.split(" ")) pairs2.add(new Tuple2<>(word, word)); + for (String word : s.split(" ")) { + pairs2.add(new Tuple2<>(word, word)); + } return pairs2; }); @@ -204,11 +206,12 @@ public class Java8APISuite implements Serializable { JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { List<Double> lengths = new LinkedList<>(); - for (String word : s.split(" ")) lengths.add(word.length() * 1.0); + for (String word : s.split(" ")) { + lengths.add((double) word.length()); + } return lengths; }); - Double x = doubles.first(); Assert.assertEquals(5.0, doubles.first(), 0.01); Assert.assertEquals(11, pairs.count()); } @@ -228,7 +231,7 @@ public class Java8APISuite implements Serializable { swapped.collect(); // There was never a bug here, but it's worth testing: - pairRDD.map(item -> item.swap()).collect(); + pairRDD.map(Tuple2::swap).collect(); } @Test @@ -282,11 +285,11 @@ public class Java8APISuite implements Serializable { FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = (Iterator<Integer> i, Iterator<String> s) -> { int sizeI = 0; - int sizeS = 0; while (i.hasNext()) { sizeI += 1; i.next(); } + int sizeS = 0; while (s.hasNext()) { sizeS += 1; s.next(); @@ -301,30 +304,31 @@ public class Java8APISuite implements Serializable { public void accumulators() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - final Accumulator<Integer> intAccum = sc.intAccumulator(10); - rdd.foreach(x -> intAccum.add(x)); + Accumulator<Integer> intAccum = sc.intAccumulator(10); + rdd.foreach(intAccum::add); Assert.assertEquals((Integer) 25, intAccum.value()); - final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); + Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); rdd.foreach(x -> doubleAccum.add((double) x)); Assert.assertEquals((Double) 25.0, doubleAccum.value()); // Try a custom accumulator type AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { + @Override public Float addInPlace(Float r, Float t) { return r + t; } - + @Override public Float addAccumulator(Float r, Float t) { return r + t; } - + @Override public Float zero(Float initialValue) { return 0.0f; } }; - final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); + Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); rdd.foreach(x -> floatAccum.add((float) x)); Assert.assertEquals((Float) 25.0f, floatAccum.value()); @@ -336,7 +340,7 @@ public class Java8APISuite implements Serializable { @Test public void keyBy() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); - List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect(); + List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect(); Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); } @@ -349,7 +353,7 @@ public class Java8APISuite implements Serializable { JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); Assert.assertEquals(Arrays.asList( - new Tuple2<Integer, Integer>(1, 1), + new Tuple2<>(1, 1), new Tuple2<>(0, 2), new Tuple2<>(1, 3), new Tuple2<>(0, 4)), rdd3.collect()); @@ -361,7 +365,7 @@ public class Java8APISuite implements Serializable { JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); - List[] parts = rdd1.collectPartitions(new int[]{0}); + List<Integer>[] parts = rdd1.collectPartitions(new int[]{0}); Assert.assertEquals(Arrays.asList(1, 2), parts[0]); parts = rdd1.collectPartitions(new int[]{1, 2}); @@ -371,19 +375,19 @@ public class Java8APISuite implements Serializable { Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), rdd2.collectPartitions(new int[]{0})[0]); - parts = rdd2.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts[0]); + List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2}); + Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), - parts[1]); + parts2[1]); } @Test public void collectAsMapWithIntArrayValues() { // Regression test for SPARK-1040 - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1})); + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); pairRDD.collect(); // Works fine - Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException + pairRDD.collectAsMap(); // Used to crash with ClassCastException } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java index bf693c7c39..7b50aad4ad 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java @@ -18,6 +18,7 @@ package test.org.apache.spark.sql; import java.io.Serializable; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -83,7 +84,7 @@ public class JavaApplySchemaSuite implements Serializable { @Test public void applySchema() { - List<Person> personList = new ArrayList<Person>(2); + List<Person> personList = new ArrayList<>(2); Person person1 = new Person(); person1.setName("Michael"); person1.setAge(29); @@ -95,12 +96,13 @@ public class JavaApplySchemaSuite implements Serializable { JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map( new Function<Person, Row>() { + @Override public Row call(Person person) throws Exception { return RowFactory.create(person.getName(), person.getAge()); } }); - List<StructField> fields = new ArrayList<StructField>(2); + List<StructField> fields = new ArrayList<>(2); fields.add(DataTypes.createStructField("name", DataTypes.StringType, false)); fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false)); StructType schema = DataTypes.createStructType(fields); @@ -118,7 +120,7 @@ public class JavaApplySchemaSuite implements Serializable { @Test public void dataFrameRDDOperations() { - List<Person> personList = new ArrayList<Person>(2); + List<Person> personList = new ArrayList<>(2); Person person1 = new Person(); person1.setName("Michael"); person1.setAge(29); @@ -129,27 +131,28 @@ public class JavaApplySchemaSuite implements Serializable { personList.add(person2); JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map( - new Function<Person, Row>() { - public Row call(Person person) throws Exception { - return RowFactory.create(person.getName(), person.getAge()); - } - }); - - List<StructField> fields = new ArrayList<StructField>(2); - fields.add(DataTypes.createStructField("name", DataTypes.StringType, false)); + new Function<Person, Row>() { + @Override + public Row call(Person person) { + return RowFactory.create(person.getName(), person.getAge()); + } + }); + + List<StructField> fields = new ArrayList<>(2); + fields.add(DataTypes.createStructField("", DataTypes.StringType, false)); fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false)); StructType schema = DataTypes.createStructType(fields); DataFrame df = sqlContext.applySchema(rowRDD, schema); df.registerTempTable("people"); List<String> actual = sqlContext.sql("SELECT * FROM people").toJavaRDD().map(new Function<Row, String>() { - + @Override public String call(Row row) { - return row.getString(0) + "_" + row.get(1).toString(); + return row.getString(0) + "_" + row.get(1); } }).collect(); - List<String> expected = new ArrayList<String>(2); + List<String> expected = new ArrayList<>(2); expected.add("Michael_29"); expected.add("Yin_28"); @@ -165,7 +168,7 @@ public class JavaApplySchemaSuite implements Serializable { "{\"string\":\"this is another simple string.\", \"integer\":11, \"long\":21474836469, " + "\"bigInteger\":92233720368547758069, \"double\":1.7976931348623157E305, " + "\"boolean\":false, \"null\":null}")); - List<StructField> fields = new ArrayList<StructField>(7); + List<StructField> fields = new ArrayList<>(7); fields.add(DataTypes.createStructField("bigInteger", DataTypes.createDecimalType(20, 0), true)); fields.add(DataTypes.createStructField("boolean", DataTypes.BooleanType, true)); @@ -175,10 +178,10 @@ public class JavaApplySchemaSuite implements Serializable { fields.add(DataTypes.createStructField("null", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("string", DataTypes.StringType, true)); StructType expectedSchema = DataTypes.createStructType(fields); - List<Row> expectedResult = new ArrayList<Row>(2); + List<Row> expectedResult = new ArrayList<>(2); expectedResult.add( RowFactory.create( - new java.math.BigDecimal("92233720368547758070"), + new BigDecimal("92233720368547758070"), true, 1.7976931348623157E308, 10, @@ -187,7 +190,7 @@ public class JavaApplySchemaSuite implements Serializable { "this is a simple string.")); expectedResult.add( RowFactory.create( - new java.math.BigDecimal("92233720368547758069"), + new BigDecimal("92233720368547758069"), false, 1.7976931348623157E305, 11, diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 4867cebf53..d981ce947f 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -61,7 +61,7 @@ public class JavaDataFrameSuite { @Test public void testExecution() { DataFrame df = context.table("testData").filter("key = 1"); - Assert.assertEquals(df.select("key").collect()[0].get(0), 1); + Assert.assertEquals(1, df.select("key").collect()[0].get(0)); } /** @@ -119,7 +119,7 @@ public class JavaDataFrameSuite { public static class Bean implements Serializable { private double a = 0.0; - private Integer[] b = new Integer[]{0, 1}; + private Integer[] b = { 0, 1 }; private Map<String, int[]> c = ImmutableMap.of("hello", new int[] { 1, 2 }); private List<String> d = Arrays.asList("floppy", "disk"); @@ -161,7 +161,7 @@ public class JavaDataFrameSuite { schema.apply("d")); Row first = df.select("a", "b", "c", "d").first(); Assert.assertEquals(bean.getA(), first.getDouble(0), 0.0); - // Now Java lists and maps are converetd to Scala Seq's and Map's. Once we get a Seq below, + // Now Java lists and maps are converted to Scala Seq's and Map's. Once we get a Seq below, // verify that it has the expected length, and contains expected elements. Seq<Integer> result = first.getAs(1); Assert.assertEquals(bean.getB().length, result.length()); @@ -180,7 +180,8 @@ public class JavaDataFrameSuite { } } - private static Comparator<Row> CrosstabRowComparator = new Comparator<Row>() { + private static final Comparator<Row> crosstabRowComparator = new Comparator<Row>() { + @Override public int compare(Row row1, Row row2) { String item1 = row1.getString(0); String item2 = row2.getString(0); @@ -193,16 +194,16 @@ public class JavaDataFrameSuite { DataFrame df = context.table("testData2"); DataFrame crosstab = df.stat().crosstab("a", "b"); String[] columnNames = crosstab.schema().fieldNames(); - Assert.assertEquals(columnNames[0], "a_b"); - Assert.assertEquals(columnNames[1], "1"); - Assert.assertEquals(columnNames[2], "2"); + Assert.assertEquals("a_b", columnNames[0]); + Assert.assertEquals("1", columnNames[1]); + Assert.assertEquals("2", columnNames[2]); Row[] rows = crosstab.collect(); - Arrays.sort(rows, CrosstabRowComparator); + Arrays.sort(rows, crosstabRowComparator); Integer count = 1; for (Row row : rows) { Assert.assertEquals(row.get(0).toString(), count.toString()); - Assert.assertEquals(row.getLong(1), 1L); - Assert.assertEquals(row.getLong(2), 1L); + Assert.assertEquals(1L, row.getLong(1)); + Assert.assertEquals(1L, row.getLong(2)); count++; } } @@ -210,7 +211,7 @@ public class JavaDataFrameSuite { @Test public void testFrequentItems() { DataFrame df = context.table("testData2"); - String[] cols = new String[]{"a"}; + String[] cols = {"a"}; DataFrame results = df.stat().freqItems(cols, 0.2); Assert.assertTrue(results.collect()[0].getSeq(0).contains(1)); } @@ -219,14 +220,14 @@ public class JavaDataFrameSuite { public void testCorrelation() { DataFrame df = context.table("testData2"); Double pearsonCorr = df.stat().corr("a", "b", "pearson"); - Assert.assertTrue(Math.abs(pearsonCorr) < 1e-6); + Assert.assertTrue(Math.abs(pearsonCorr) < 1.0e-6); } @Test public void testCovariance() { DataFrame df = context.table("testData2"); Double result = df.stat().cov("a", "b"); - Assert.assertTrue(Math.abs(result) < 1e-6); + Assert.assertTrue(Math.abs(result) < 1.0e-6); } @Test @@ -234,7 +235,7 @@ public class JavaDataFrameSuite { DataFrame df = context.range(0, 100, 1, 2).select(col("id").mod(3).as("key")); DataFrame sampled = df.stat().<Integer>sampleBy("key", ImmutableMap.of(0, 0.1, 1, 0.2), 0L); Row[] actual = sampled.groupBy("key").count().orderBy("key").collect(); - Row[] expected = new Row[] {RowFactory.create(0, 5), RowFactory.create(1, 8)}; + Row[] expected = {RowFactory.create(0, 5), RowFactory.create(1, 8)}; Assert.assertArrayEquals(expected, actual); } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaRowSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaRowSuite.java index 4ce1d1dddb..3ab4db2a03 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaRowSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaRowSuite.java @@ -18,6 +18,7 @@ package test.org.apache.spark.sql; import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.sql.Timestamp; import java.util.Arrays; @@ -52,12 +53,12 @@ public class JavaRowSuite { shortValue = (short)32767; intValue = 2147483647; longValue = 9223372036854775807L; - floatValue = (float)3.4028235E38; + floatValue = 3.4028235E38f; doubleValue = 1.7976931348623157E308; decimalValue = new BigDecimal("1.7976931348623157E328"); booleanValue = true; stringValue = "this is a string"; - binaryValue = stringValue.getBytes(); + binaryValue = stringValue.getBytes(StandardCharsets.UTF_8); dateValue = Date.valueOf("2014-06-30"); timestampValue = Timestamp.valueOf("2014-06-30 09:20:00.0"); } @@ -123,8 +124,8 @@ public class JavaRowSuite { Assert.assertEquals(binaryValue, simpleRow.get(16)); Assert.assertEquals(dateValue, simpleRow.get(17)); Assert.assertEquals(timestampValue, simpleRow.get(18)); - Assert.assertEquals(true, simpleRow.isNullAt(19)); - Assert.assertEquals(null, simpleRow.get(19)); + Assert.assertTrue(simpleRow.isNullAt(19)); + Assert.assertNull(simpleRow.get(19)); } @Test @@ -134,7 +135,7 @@ public class JavaRowSuite { stringValue + " (1)", stringValue + " (2)", stringValue + "(3)"); // Simple map - Map<String, Long> simpleMap = new HashMap<String, Long>(); + Map<String, Long> simpleMap = new HashMap<>(); simpleMap.put(stringValue + " (1)", longValue); simpleMap.put(stringValue + " (2)", longValue - 1); simpleMap.put(stringValue + " (3)", longValue - 2); @@ -149,7 +150,7 @@ public class JavaRowSuite { List<Row> arrayOfRows = Arrays.asList(simpleStruct); // Complex map - Map<List<Row>, Row> complexMap = new HashMap<List<Row>, Row>(); + Map<List<Row>, Row> complexMap = new HashMap<>(); complexMap.put(arrayOfRows, simpleStruct); // Complex struct @@ -167,7 +168,7 @@ public class JavaRowSuite { Assert.assertEquals(arrayOfMaps, complexStruct.get(3)); Assert.assertEquals(arrayOfRows, complexStruct.get(4)); Assert.assertEquals(complexMap, complexStruct.get(5)); - Assert.assertEquals(null, complexStruct.get(6)); + Assert.assertNull(complexStruct.get(6)); // A very complex row Row complexRow = RowFactory.create(arrayOfMaps, arrayOfRows, complexMap, complexStruct); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java index bb02b58cca..4a78dca7fe 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java @@ -20,6 +20,7 @@ package test.org.apache.spark.sql; import java.io.Serializable; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -61,13 +62,13 @@ public class JavaUDFSuite implements Serializable { sqlContext.udf().register("stringLengthTest", new UDF1<String, Integer>() { @Override - public Integer call(String str) throws Exception { + public Integer call(String str) { return str.length(); } }, DataTypes.IntegerType); Row result = sqlContext.sql("SELECT stringLengthTest('test')").head(); - assert(result.getInt(0) == 4); + Assert.assertEquals(4, result.getInt(0)); } @SuppressWarnings("unchecked") @@ -81,12 +82,12 @@ public class JavaUDFSuite implements Serializable { sqlContext.udf().register("stringLengthTest", new UDF2<String, String, Integer>() { @Override - public Integer call(String str1, String str2) throws Exception { + public Integer call(String str1, String str2) { return str1.length() + str2.length(); } }, DataTypes.IntegerType); Row result = sqlContext.sql("SELECT stringLengthTest('test', 'test2')").head(); - assert(result.getInt(0) == 9); + Assert.assertEquals(9, result.getInt(0)); } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java index 6f9e7f68dc..9e241f2098 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java @@ -44,7 +44,7 @@ public class JavaSaveLoadSuite { File path; DataFrame df; - private void checkAnswer(DataFrame actual, List<Row> expected) { + private static void checkAnswer(DataFrame actual, List<Row> expected) { String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); if (errorMessage != null) { Assert.fail(errorMessage); @@ -64,7 +64,7 @@ public class JavaSaveLoadSuite { path.delete(); } - List<String> jsonObjects = new ArrayList<String>(10); + List<String> jsonObjects = new ArrayList<>(10); for (int i = 0; i < 10; i++) { jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); } @@ -82,7 +82,7 @@ public class JavaSaveLoadSuite { @Test public void saveAndLoad() { - Map<String, String> options = new HashMap<String, String>(); + Map<String, String> options = new HashMap<>(); options.put("path", path.toString()); df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save(); DataFrame loadedDF = sqlContext.read().format("json").options(options).load(); @@ -91,11 +91,11 @@ public class JavaSaveLoadSuite { @Test public void saveAndLoadWithSchema() { - Map<String, String> options = new HashMap<String, String>(); + Map<String, String> options = new HashMap<>(); options.put("path", path.toString()); df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save(); - List<StructField> fields = new ArrayList<StructField>(); + List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("b", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); DataFrame loadedDF = sqlContext.read().format("json").schema(schema).options(options).load(); diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java index 019d8a3026..b4bf9eef8f 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -40,7 +40,7 @@ public class JavaDataFrameSuite { DataFrame df; - private void checkAnswer(DataFrame actual, List<Row> expected) { + private static void checkAnswer(DataFrame actual, List<Row> expected) { String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); if (errorMessage != null) { Assert.fail(errorMessage); @@ -52,7 +52,7 @@ public class JavaDataFrameSuite { hc = TestHive$.MODULE$; sc = new JavaSparkContext(hc.sparkContext()); - List<String> jsonObjects = new ArrayList<String>(10); + List<String> jsonObjects = new ArrayList<>(10); for (int i = 0; i < 10; i++) { jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}"); } @@ -71,7 +71,7 @@ public class JavaDataFrameSuite { @Test public void saveTableAndQueryIt() { checkAnswer( - df.select(functions.avg("key").over( + df.select(avg("key").over( Window.partitionBy("value").orderBy("key").rowsBetween(-1, 1))), hc.sql("SELECT avg(key) " + "OVER (PARTITION BY value " + @@ -95,7 +95,7 @@ public class JavaDataFrameSuite { registeredUDAF.apply(col("value")), callUDF("mydoublesum", col("value"))); - List<Row> expectedResult = new ArrayList<Row>(); + List<Row> expectedResult = new ArrayList<>(); expectedResult.add(RowFactory.create(4950.0, 9900.0, 9900.0, 9900.0)); checkAnswer( aggregatedDF, diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 4192155975..c8d272794d 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -53,7 +53,7 @@ public class JavaMetastoreDataSourcesSuite { FileSystem fs; DataFrame df; - private void checkAnswer(DataFrame actual, List<Row> expected) { + private static void checkAnswer(DataFrame actual, List<Row> expected) { String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); if (errorMessage != null) { Assert.fail(errorMessage); @@ -77,7 +77,7 @@ public class JavaMetastoreDataSourcesSuite { fs.delete(hiveManagedPath, true); } - List<String> jsonObjects = new ArrayList<String>(10); + List<String> jsonObjects = new ArrayList<>(10); for (int i = 0; i < 10; i++) { jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); } @@ -97,7 +97,7 @@ public class JavaMetastoreDataSourcesSuite { @Test public void saveExternalTableAndQueryIt() { - Map<String, String> options = new HashMap<String, String>(); + Map<String, String> options = new HashMap<>(); options.put("path", path.toString()); df.write() .format("org.apache.spark.sql.json") @@ -120,7 +120,7 @@ public class JavaMetastoreDataSourcesSuite { @Test public void saveExternalTableWithSchemaAndQueryIt() { - Map<String, String> options = new HashMap<String, String>(); + Map<String, String> options = new HashMap<>(); options.put("path", path.toString()); df.write() .format("org.apache.spark.sql.json") @@ -132,7 +132,7 @@ public class JavaMetastoreDataSourcesSuite { sqlContext.sql("SELECT * FROM javaSavedTable"), df.collectAsList()); - List<StructField> fields = new ArrayList<StructField>(); + List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("b", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); DataFrame loadedDF = @@ -148,7 +148,7 @@ public class JavaMetastoreDataSourcesSuite { @Test public void saveTableAndQueryIt() { - Map<String, String> options = new HashMap<String, String>(); + Map<String, String> options = new HashMap<>(); df.write() .format("org.apache.spark.sql.json") .mode(SaveMode.Append) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index e0718f73aa..c521714922 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -18,24 +18,22 @@ package org.apache.spark.streaming; import java.io.*; -import java.lang.Iterable; import java.nio.charset.Charset; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; +import scala.Tuple2; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import scala.Tuple2; - import org.junit.Assert; -import static org.junit.Assert.*; import org.junit.Test; import com.google.common.base.Optional; -import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.common.collect.Sets; @@ -54,14 +52,14 @@ import org.apache.spark.SparkConf; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { - public void equalIterator(Iterator<?> a, Iterator<?> b) { + public static void equalIterator(Iterator<?> a, Iterator<?> b) { while (a.hasNext() && b.hasNext()) { Assert.assertEquals(a.next(), b.next()); } Assert.assertEquals(a.hasNext(), b.hasNext()); } - public void equalIterable(Iterable<?> a, Iterable<?> b) { + public static void equalIterable(Iterable<?> a, Iterable<?> b) { equalIterator(a.iterator(), b.iterator()); } @@ -74,14 +72,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testContextState() { List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4)); - Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaTestUtils.attachTestOutputStream(stream); - Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); ssc.start(); - Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE); + Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState()); ssc.stop(); - Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED); + Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState()); } @SuppressWarnings("unchecked") @@ -118,7 +116,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override - public Integer call(String s) throws Exception { + public Integer call(String s) { return s.length(); } }); @@ -180,7 +178,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<String>> expected = Arrays.asList( Arrays.asList("giants"), @@ -189,7 +187,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() { @Override - public Boolean call(String s) throws Exception { + public Boolean call(String s) { return s.contains("a"); } }); @@ -243,11 +241,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testGlom() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<List<String>>> expected = Arrays.asList( Arrays.asList(Arrays.asList("giants", "dodgers")), - Arrays.asList(Arrays.asList("yankees", "red socks"))); + Arrays.asList(Arrays.asList("yankees", "red sox"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<List<String>> glommed = stream.glom(); @@ -262,22 +260,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testMapPartitions() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<String>> expected = Arrays.asList( Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOCKS")); + Arrays.asList("YANKEESRED SOX")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<String> mapped = stream.mapPartitions( new FlatMapFunction<Iterator<String>, String>() { @Override public Iterable<String> call(Iterator<String> in) { - String out = ""; + StringBuilder out = new StringBuilder(); while (in.hasNext()) { - out = out + in.next().toUpperCase(); + out.append(in.next().toUpperCase(Locale.ENGLISH)); } - return Lists.newArrayList(out); + return Arrays.asList(out.toString()); } }); JavaTestUtils.attachTestOutputStream(mapped); @@ -286,16 +284,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } - private class IntegerSum implements Function2<Integer, Integer, Integer> { + private static class IntegerSum implements Function2<Integer, Integer, Integer> { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } } - private class IntegerDifference implements Function2<Integer, Integer, Integer> { + private static class IntegerDifference implements Function2<Integer, Integer, Integer> { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 - i2; } } @@ -347,13 +345,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(24)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reducedWindowed = null; + JavaDStream<Integer> reducedWindowed; if (withInverse) { reducedWindowed = stream.reduceByWindow(new IntegerSum(), - new IntegerDifference(), new Duration(2000), new Duration(1000)); + new IntegerDifference(), new Duration(2000), new Duration(1000)); } else { reducedWindowed = stream.reduceByWindow(new IntegerSum(), - new Duration(2000), new Duration(1000)); + new Duration(2000), new Duration(1000)); } JavaTestUtils.attachTestOutputStream(reducedWindowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -378,11 +376,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(7,8,9)); JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); - JavaRDD<Integer> rdd1 = ssc.sparkContext().parallelize(Arrays.asList(1, 2, 3)); - JavaRDD<Integer> rdd2 = ssc.sparkContext().parallelize(Arrays.asList(4, 5, 6)); - JavaRDD<Integer> rdd3 = ssc.sparkContext().parallelize(Arrays.asList(7,8,9)); + JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3)); + JavaRDD<Integer> rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6)); + JavaRDD<Integer> rdd3 = jsc.parallelize(Arrays.asList(7,8,9)); - LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList(); + Queue<JavaRDD<Integer>> rdds = new LinkedList<>(); rdds.add(rdd1); rdds.add(rdd2); rdds.add(rdd3); @@ -410,10 +408,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> transformed = stream.transform( new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + public JavaRDD<Integer> call(JavaRDD<Integer> in) { return in.map(new Function<Integer, Integer>() { @Override - public Integer call(Integer i) throws Exception { + public Integer call(Integer i) { return i + 2; } }); @@ -435,70 +433,70 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); List<List<Tuple2<String, Integer>>> pairInputData = - Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - JavaDStream<Integer> transformed1 = stream.transform( + stream.transform( new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + public JavaRDD<Integer> call(JavaRDD<Integer> in) { return null; } } ); - JavaDStream<Integer> transformed2 = stream.transform( + stream.transform( new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) { return null; } } ); - JavaPairDStream<String, Integer> transformed3 = stream.transformToPair( + stream.transformToPair( new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { - @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) { return null; } } ); - JavaPairDStream<String, Integer> transformed4 = stream.transformToPair( + stream.transformToPair( new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { - @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) { return null; } } ); - JavaDStream<Integer> pairTransformed1 = pairStream.transform( + pairStream.transform( new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) { return null; } } ); - JavaDStream<Integer> pairTransformed2 = pairStream.transform( + pairStream.transform( new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) { return null; } } ); - JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair( + pairStream.transformToPair( new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { - @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) { return null; } } ); - JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair( + pairStream.transformToPair( new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { - @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) { return null; } } @@ -511,32 +509,32 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testTransformWith() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( Arrays.asList( - new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), + new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), Arrays.asList( - new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("new york", "rangers"))); + new Tuple2<>("california", "sharks"), + new Tuple2<>("new york", "rangers"))); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( Arrays.asList( - new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "mets")), + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "mets")), Arrays.asList( - new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "islanders"))); + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "islanders"))); List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( Sets.newHashSet( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("dodgers", "giants")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("yankees", "mets"))), + new Tuple2<>("california", + new Tuple2<>("dodgers", "giants")), + new Tuple2<>("new york", + new Tuple2<>("yankees", "mets"))), Sets.newHashSet( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("sharks", "ducks")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("rangers", "islanders")))); + new Tuple2<>("california", + new Tuple2<>("sharks", "ducks")), + new Tuple2<>("new york", + new Tuple2<>("rangers", "islanders")))); JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( ssc, stringStringKVStream1, 1); @@ -552,14 +550,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, - JavaPairRDD<String, Tuple2<String, String>> - >() { + JavaPairRDD<String, Tuple2<String, String>>>() { @Override public JavaPairRDD<String, Tuple2<String, String>> call( JavaPairRDD<String, String> rdd1, JavaPairRDD<String, String> rdd2, - Time time - ) throws Exception { + Time time) { return rdd1.join(rdd2); } } @@ -567,9 +563,9 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestOutputStream(joined); List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); + List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>(); for (List<Tuple2<String, Tuple2<String, String>>> res: result) { - unorderedResult.add(Sets.newHashSet(res)); + unorderedResult.add(Sets.newHashSet(res)); } Assert.assertEquals(expected, unorderedResult); @@ -587,89 +583,89 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); List<List<Tuple2<String, Integer>>> pairInputData1 = - Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); List<List<Tuple2<Double, Character>>> pairInputData2 = - Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x'))); + Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - JavaDStream<Double> transformed1 = stream1.transformWith( + stream1.transformWith( stream2, new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { @Override - public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) { return null; } } ); - JavaDStream<Double> transformed2 = stream1.transformWith( + stream1.transformWith( pairStream1, new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { @Override - public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) { return null; } } ); - JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair( + stream1.transformWithToPair( stream2, new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override - public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) { return null; } } ); - JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair( + stream1.transformWithToPair( pairStream1, new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() { @Override - public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) { return null; } } ); - JavaDStream<Double> pairTransformed1 = pairStream1.transformWith( + pairStream1.transformWith( stream2, new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { @Override - public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) { return null; } } ); - JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith( + pairStream1.transformWith( pairStream1, new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { @Override - public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) { return null; } } ); - JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair( + pairStream1.transformWithToPair( stream2, new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override - public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) { return null; } } ); - JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair( + pairStream1.transformWithToPair( pairStream2, new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() { @Override - public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception { + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) { return null; } } @@ -690,13 +686,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ); List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( - Arrays.asList(new Tuple2<Integer, String>(1, "x")), - Arrays.asList(new Tuple2<Integer, String>(2, "y")) + Arrays.asList(new Tuple2<>(1, "x")), + Arrays.asList(new Tuple2<>(2, "y")) ); List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))), - Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y"))) + Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), + Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) ); JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); @@ -707,7 +703,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); // This is just to test whether this transform to JavaStream compiles - JavaDStream<Long> transformed1 = ssc.transform( + ssc.transform( listOfDStreams1, new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() { @Override @@ -733,8 +729,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() { @Override - public Tuple2<Integer, Integer> call(Integer i) throws Exception { - return new Tuple2<Integer, Integer>(i, i); + public Tuple2<Integer, Integer> call(Integer i) { + return new Tuple2<>(i, i); } }; return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); @@ -763,7 +759,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { - return Lists.newArrayList(x.split("(?!^)")); + return Arrays.asList(x.split("(?!^)")); } }); JavaTestUtils.attachTestOutputStream(flatMapped); @@ -782,39 +778,39 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(6, "g"), - new Tuple2<Integer, String>(6, "i"), - new Tuple2<Integer, String>(6, "a"), - new Tuple2<Integer, String>(6, "n"), - new Tuple2<Integer, String>(6, "t"), - new Tuple2<Integer, String>(6, "s")), + new Tuple2<>(6, "g"), + new Tuple2<>(6, "i"), + new Tuple2<>(6, "a"), + new Tuple2<>(6, "n"), + new Tuple2<>(6, "t"), + new Tuple2<>(6, "s")), Arrays.asList( - new Tuple2<Integer, String>(7, "d"), - new Tuple2<Integer, String>(7, "o"), - new Tuple2<Integer, String>(7, "d"), - new Tuple2<Integer, String>(7, "g"), - new Tuple2<Integer, String>(7, "e"), - new Tuple2<Integer, String>(7, "r"), - new Tuple2<Integer, String>(7, "s")), + new Tuple2<>(7, "d"), + new Tuple2<>(7, "o"), + new Tuple2<>(7, "d"), + new Tuple2<>(7, "g"), + new Tuple2<>(7, "e"), + new Tuple2<>(7, "r"), + new Tuple2<>(7, "s")), Arrays.asList( - new Tuple2<Integer, String>(9, "a"), - new Tuple2<Integer, String>(9, "t"), - new Tuple2<Integer, String>(9, "h"), - new Tuple2<Integer, String>(9, "l"), - new Tuple2<Integer, String>(9, "e"), - new Tuple2<Integer, String>(9, "t"), - new Tuple2<Integer, String>(9, "i"), - new Tuple2<Integer, String>(9, "c"), - new Tuple2<Integer, String>(9, "s"))); + new Tuple2<>(9, "a"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "h"), + new Tuple2<>(9, "l"), + new Tuple2<>(9, "e"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "i"), + new Tuple2<>(9, "c"), + new Tuple2<>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair( new PairFlatMapFunction<String, Integer, String>() { @Override - public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); + public Iterable<Tuple2<Integer, String>> call(String in) { + List<Tuple2<Integer, String>> out = new ArrayList<>(); for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<Integer, String>(in.length(), letter)); + out.add(new Tuple2<>(in.length(), letter)); } return out; } @@ -859,13 +855,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa */ public static <T> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { - List<Set<T>> expectedSets = new ArrayList<Set<T>>(); + List<Set<T>> expectedSets = new ArrayList<>(); for (List<T> list: expected) { - expectedSets.add(Collections.unmodifiableSet(new HashSet<T>(list))); + expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list))); } - List<Set<T>> actualSets = new ArrayList<Set<T>>(); + List<Set<T>> actualSets = new ArrayList<>(); for (List<T> list: actual) { - actualSets.add(Collections.unmodifiableSet(new HashSet<T>(list))); + actualSets.add(Collections.unmodifiableSet(new HashSet<>(list))); } Assert.assertEquals(expectedSets, actualSets); } @@ -877,25 +873,25 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testPairFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("giants", 6)), - Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); + Arrays.asList(new Tuple2<>("giants", 6)), + Arrays.asList(new Tuple2<>("yankees", 7))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = stream.mapToPair( new PairFunction<String, String, Integer>() { @Override - public Tuple2<String, Integer> call(String in) throws Exception { - return new Tuple2<String, Integer>(in, in.length()); + public Tuple2<String, Integer> call(String in) { + return new Tuple2<>(in, in.length()); } }); JavaPairDStream<String, Integer> filtered = pairStream.filter( new Function<Tuple2<String, Integer>, Boolean>() { @Override - public Boolean call(Tuple2<String, Integer> in) throws Exception { + public Boolean call(Tuple2<String, Integer> in) { return in._1().contains("a"); } }); @@ -906,28 +902,28 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } @SuppressWarnings("unchecked") - private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "yankees"), - new Tuple2<String, String>("new york", "mets")), - Arrays.asList(new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "rangers"), - new Tuple2<String, String>("new york", "islanders"))); + private final List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "yankees"), + new Tuple2<>("new york", "mets")), + Arrays.asList(new Tuple2<>("california", "sharks"), + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "rangers"), + new Tuple2<>("new york", "islanders"))); @SuppressWarnings("unchecked") - private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( + private final List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 1), - new Tuple2<String, Integer>("california", 3), - new Tuple2<String, Integer>("new york", 4), - new Tuple2<String, Integer>("new york", 1)), + new Tuple2<>("california", 1), + new Tuple2<>("california", 3), + new Tuple2<>("new york", 4), + new Tuple2<>("new york", 1)), Arrays.asList( - new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("new york", 3), - new Tuple2<String, Integer>("new york", 1))); + new Tuple2<>("california", 5), + new Tuple2<>("california", 5), + new Tuple2<>("new york", 3), + new Tuple2<>("new york", 1))); @SuppressWarnings("unchecked") @Test @@ -936,22 +932,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "california"), - new Tuple2<Integer, String>(3, "california"), - new Tuple2<Integer, String>(4, "new york"), - new Tuple2<Integer, String>(1, "new york")), + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), Arrays.asList( - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(3, "new york"), - new Tuple2<Integer, String>(1, "new york"))); + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<Integer, String> reversed = pairStream.mapToPair( new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override - public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { + public Tuple2<Integer, String> call(Tuple2<String, Integer> in) { return in.swap(); } }); @@ -969,23 +965,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "california"), - new Tuple2<Integer, String>(3, "california"), - new Tuple2<Integer, String>(4, "new york"), - new Tuple2<Integer, String>(1, "new york")), + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), Arrays.asList( - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(3, "new york"), - new Tuple2<Integer, String>(1, "new york"))); + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair( new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { @Override - public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception { - LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) { + List<Tuple2<Integer, String>> out = new LinkedList<>(); while (in.hasNext()) { Tuple2<String, Integer> next = in.next(); out.add(next.swap()); @@ -1014,7 +1010,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> reversed = pairStream.map( new Function<Tuple2<String, Integer>, Integer>() { @Override - public Integer call(Tuple2<String, Integer> in) throws Exception { + public Integer call(Tuple2<String, Integer> in) { return in._2(); } }); @@ -1030,23 +1026,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("hi", 1), - new Tuple2<String, Integer>("ho", 2)), + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2)), Arrays.asList( - new Tuple2<String, Integer>("hi", 1), - new Tuple2<String, Integer>("ho", 2))); + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2))); List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "h"), - new Tuple2<Integer, String>(1, "i"), - new Tuple2<Integer, String>(2, "h"), - new Tuple2<Integer, String>(2, "o")), + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o")), Arrays.asList( - new Tuple2<Integer, String>(1, "h"), - new Tuple2<Integer, String>(1, "i"), - new Tuple2<Integer, String>(2, "h"), - new Tuple2<Integer, String>(2, "o"))); + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -1054,10 +1050,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair( new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { @Override - public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception { - List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) { + List<Tuple2<Integer, String>> out = new LinkedList<>(); for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<Integer, String>(in._2(), s.toString())); + out.add(new Tuple2<>(in._2(), s.toString())); } return out; } @@ -1075,11 +1071,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), + new Tuple2<>("california", Arrays.asList("dodgers", "giants")), + new Tuple2<>("new york", Arrays.asList("yankees", "mets"))), Arrays.asList( - new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + new Tuple2<>("california", Arrays.asList("sharks", "ducks")), + new Tuple2<>("new york", Arrays.asList("rangers", "islanders")))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -1111,11 +1107,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), Arrays.asList( - new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1136,20 +1132,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), Arrays.asList( - new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( + JavaPairDStream<String, Integer> combined = pairStream.combineByKey( new Function<Integer, Integer>() { @Override - public Integer call(Integer i) throws Exception { + public Integer call(Integer i) { return i; } }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); @@ -1170,13 +1166,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Long>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("world", 1L)), + new Tuple2<>("hello", 1L), + new Tuple2<>("world", 1L)), Arrays.asList( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("moon", 1L)), + new Tuple2<>("hello", 1L), + new Tuple2<>("moon", 1L)), Arrays.asList( - new Tuple2<String, Long>("hello", 1L))); + new Tuple2<>("hello", 1L))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Long> counted = stream.countByValue(); @@ -1193,16 +1189,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)), - new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4)) + new Tuple2<>("california", Arrays.asList(1, 3)), + new Tuple2<>("new york", Arrays.asList(1, 4)) ), Arrays.asList( - new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)), - new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4)) + new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)), + new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4)) ), Arrays.asList( - new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)), - new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3)) + new Tuple2<>("california", Arrays.asList(5, 5)), + new Tuple2<>("new york", Arrays.asList(1, 3)) ) ); @@ -1220,16 +1216,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } } - private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) { - List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>(); + private static Set<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) { + List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>(); for (Tuple2<String, List<Integer>> tuple: listOfTuples) { newListOfTuples.add(convert(tuple)); } - return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples); + return new HashSet<>(newListOfTuples); } - private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) { - return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2())); + private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) { + return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2())); } @SuppressWarnings("unchecked") @@ -1238,12 +1234,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -1262,12 +1258,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -1278,10 +1274,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { int out = 0; if (state.isPresent()) { - out = out + state.get(); + out += state.get(); } for (Integer v : values) { - out = out + v; + out += v; } return Optional.of(out); } @@ -1298,19 +1294,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<Tuple2<String, Integer>> initial = Arrays.asList ( - new Tuple2<String, Integer> ("california", 1), - new Tuple2<String, Integer> ("new york", 2)); + new Tuple2<>("california", 1), + new Tuple2<>("new york", 2)); JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial); JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD (tmpRDD); List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("new york", 7)), - Arrays.asList(new Tuple2<String, Integer>("california", 15), - new Tuple2<String, Integer>("new york", 11)), - Arrays.asList(new Tuple2<String, Integer>("california", 15), - new Tuple2<String, Integer>("new york", 11))); + Arrays.asList(new Tuple2<>("california", 5), + new Tuple2<>("new york", 7)), + Arrays.asList(new Tuple2<>("california", 15), + new Tuple2<>("new york", 11)), + Arrays.asList(new Tuple2<>("california", 15), + new Tuple2<>("new york", 11))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -1321,10 +1317,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { int out = 0; if (state.isPresent()) { - out = out + state.get(); + out += state.get(); } for (Integer v : values) { - out = out + v; + out += v; } return Optional.of(out); } @@ -1341,19 +1337,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> reduceWindowed = pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), - new Duration(2000), new Duration(1000)); + new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1370,15 +1366,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList( Sets.newHashSet( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("world", 1L)), + new Tuple2<>("hello", 1L), + new Tuple2<>("world", 1L)), Sets.newHashSet( - new Tuple2<String, Long>("hello", 2L), - new Tuple2<String, Long>("world", 1L), - new Tuple2<String, Long>("moon", 1L)), + new Tuple2<>("hello", 2L), + new Tuple2<>("world", 1L), + new Tuple2<>("moon", 1L)), Sets.newHashSet( - new Tuple2<String, Long>("hello", 2L), - new Tuple2<String, Long>("moon", 1L))); + new Tuple2<>("hello", 2L), + new Tuple2<>("moon", 1L))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1386,7 +1382,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa stream.countByValueAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList(); + List<Set<Tuple2<String, Long>>> unorderedResult = new ArrayList<>(); for (List<Tuple2<String, Long>> res: result) { unorderedResult.add(Sets.newHashSet(res)); } @@ -1399,27 +1395,27 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testPairTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(2, 5)), + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(1, 5))); + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5)), + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5))); + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5))); JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1428,7 +1424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair( new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() { @Override - public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { + public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) { return in.sortByKey(); } }); @@ -1444,15 +1440,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testPairToNormalRDDTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(2, 5)), + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(1, 5))); + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); List<List<Integer>> expected = Arrays.asList( Arrays.asList(3,1,4,2), @@ -1465,11 +1461,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> firstParts = pairStream.transform( new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() { @Override - public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { + public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) { return in.map(new Function<Tuple2<Integer, Integer>, Integer>() { @Override - public Integer call(Tuple2<Integer, Integer> in) { - return in._1(); + public Integer call(Tuple2<Integer, Integer> in2) { + return in2._1(); } }); } @@ -1487,14 +1483,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, String>>> inputData = stringStringKVStream; List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "DODGERS"), - new Tuple2<String, String>("california", "GIANTS"), - new Tuple2<String, String>("new york", "YANKEES"), - new Tuple2<String, String>("new york", "METS")), - Arrays.asList(new Tuple2<String, String>("california", "SHARKS"), - new Tuple2<String, String>("california", "DUCKS"), - new Tuple2<String, String>("new york", "RANGERS"), - new Tuple2<String, String>("new york", "ISLANDERS"))); + Arrays.asList(new Tuple2<>("california", "DODGERS"), + new Tuple2<>("california", "GIANTS"), + new Tuple2<>("new york", "YANKEES"), + new Tuple2<>("new york", "METS")), + Arrays.asList(new Tuple2<>("california", "SHARKS"), + new Tuple2<>("california", "DUCKS"), + new Tuple2<>("new york", "RANGERS"), + new Tuple2<>("new york", "ISLANDERS"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1502,8 +1498,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() { @Override - public String call(String s) throws Exception { - return s.toUpperCase(); + public String call(String s) { + return s.toUpperCase(Locale.ENGLISH); } }); @@ -1519,22 +1515,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, String>>> inputData = stringStringKVStream; List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers1"), - new Tuple2<String, String>("california", "dodgers2"), - new Tuple2<String, String>("california", "giants1"), - new Tuple2<String, String>("california", "giants2"), - new Tuple2<String, String>("new york", "yankees1"), - new Tuple2<String, String>("new york", "yankees2"), - new Tuple2<String, String>("new york", "mets1"), - new Tuple2<String, String>("new york", "mets2")), - Arrays.asList(new Tuple2<String, String>("california", "sharks1"), - new Tuple2<String, String>("california", "sharks2"), - new Tuple2<String, String>("california", "ducks1"), - new Tuple2<String, String>("california", "ducks2"), - new Tuple2<String, String>("new york", "rangers1"), - new Tuple2<String, String>("new york", "rangers2"), - new Tuple2<String, String>("new york", "islanders1"), - new Tuple2<String, String>("new york", "islanders2"))); + Arrays.asList(new Tuple2<>("california", "dodgers1"), + new Tuple2<>("california", "dodgers2"), + new Tuple2<>("california", "giants1"), + new Tuple2<>("california", "giants2"), + new Tuple2<>("new york", "yankees1"), + new Tuple2<>("new york", "yankees2"), + new Tuple2<>("new york", "mets1"), + new Tuple2<>("new york", "mets2")), + Arrays.asList(new Tuple2<>("california", "sharks1"), + new Tuple2<>("california", "sharks2"), + new Tuple2<>("california", "ducks1"), + new Tuple2<>("california", "ducks2"), + new Tuple2<>("new york", "rangers1"), + new Tuple2<>("new york", "rangers2"), + new Tuple2<>("new york", "islanders1"), + new Tuple2<>("new york", "islanders2"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1545,7 +1541,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Function<String, Iterable<String>>() { @Override public Iterable<String> call(String in) { - List<String> out = new ArrayList<String>(); + List<String> out = new ArrayList<>(); out.add(in + "1"); out.add(in + "2"); return out; @@ -1562,29 +1558,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testCoGroup() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), - Arrays.asList(new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("new york", "rangers"))); + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), + Arrays.asList(new Tuple2<>("california", "sharks"), + new Tuple2<>("new york", "rangers"))); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "mets")), - Arrays.asList(new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "islanders"))); + Arrays.asList(new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "mets")), + Arrays.asList(new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "islanders"))); List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Tuple2<List<String>, List<String>>>("california", - new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))), - new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", - new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))), + new Tuple2<>("california", + new Tuple2<>(Arrays.asList("dodgers"), Arrays.asList("giants"))), + new Tuple2<>("new york", + new Tuple2<>(Arrays.asList("yankees"), Arrays.asList("mets")))), Arrays.asList( - new Tuple2<String, Tuple2<List<String>, List<String>>>("california", - new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))), - new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", - new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); + new Tuple2<>("california", + new Tuple2<>(Arrays.asList("sharks"), Arrays.asList("ducks"))), + new Tuple2<>("new york", + new Tuple2<>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( @@ -1620,29 +1616,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testJoin() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), - Arrays.asList(new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("new york", "rangers"))); + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), + Arrays.asList(new Tuple2<>("california", "sharks"), + new Tuple2<>("new york", "rangers"))); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "mets")), - Arrays.asList(new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "islanders"))); + Arrays.asList(new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "mets")), + Arrays.asList(new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "islanders"))); List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("dodgers", "giants")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("yankees", "mets"))), + new Tuple2<>("california", + new Tuple2<>("dodgers", "giants")), + new Tuple2<>("new york", + new Tuple2<>("yankees", "mets"))), Arrays.asList( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("sharks", "ducks")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("rangers", "islanders")))); + new Tuple2<>("california", + new Tuple2<>("sharks", "ducks")), + new Tuple2<>("new york", + new Tuple2<>("rangers", "islanders")))); JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( @@ -1664,13 +1660,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testLeftOuterJoin() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), - Arrays.asList(new Tuple2<String, String>("california", "sharks") )); + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), + Arrays.asList(new Tuple2<>("california", "sharks") )); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "giants") ), - Arrays.asList(new Tuple2<String, String>("new york", "islanders") ) + Arrays.asList(new Tuple2<>("california", "giants") ), + Arrays.asList(new Tuple2<>("new york", "islanders") ) ); @@ -1713,7 +1709,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override - public Integer call(String s) throws Exception { + public Integer call(String s) { return s.length(); } }); @@ -1752,6 +1748,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // (used to detect the new context) final AtomicBoolean newContextCreated = new AtomicBoolean(false); Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() { + @Override public JavaStreamingContext call() { newContextCreated.set(true); return new JavaStreamingContext(conf, Seconds.apply(1)); @@ -1765,20 +1762,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa newContextCreated.set(false); ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration(), true); + new Configuration(), true); Assert.assertTrue("new context not created", newContextCreated.get()); ssc.stop(); newContextCreated.set(false); ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration()); + new Configuration()); Assert.assertTrue("old context not recovered", !newContextCreated.get()); ssc.stop(); newContextCreated.set(false); JavaSparkContext sc = new JavaSparkContext(conf); ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration()); + new Configuration()); Assert.assertTrue("old context not recovered", !newContextCreated.get()); ssc.stop(); } @@ -1800,7 +1797,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function<String, Integer>() { @Override - public Integer call(String s) throws Exception { + public Integer call(String s) { return s.length(); } }); @@ -1818,29 +1815,26 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // InputStream functionality is deferred to the existing Scala tests. @Test public void testSocketTextStream() { - JavaReceiverInputDStream<String> test = ssc.socketTextStream("localhost", 12345); + ssc.socketTextStream("localhost", 12345); } @Test public void testSocketString() { - - class Converter implements Function<InputStream, Iterable<String>> { - public Iterable<String> call(InputStream in) throws IOException { - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - List<String> out = new ArrayList<String>(); - while (true) { - String line = reader.readLine(); - if (line == null) { break; } - out.add(line); - } - return out; - } - } - - JavaDStream<String> test = ssc.socketStream( + ssc.socketStream( "localhost", 12345, - new Converter(), + new Function<InputStream, Iterable<String>>() { + @Override + public Iterable<String> call(InputStream in) throws IOException { + List<String> out = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + for (String line; (line = reader.readLine()) != null;) { + out.add(line); + } + } + return out; + } + }, StorageLevel.MEMORY_ONLY()); } @@ -1870,7 +1864,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa TextInputFormat.class, new Function<Path, Boolean>() { @Override - public Boolean call(Path v1) throws Exception { + public Boolean call(Path v1) { return Boolean.TRUE; } }, @@ -1879,7 +1873,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> test = inputStream.map( new Function<Tuple2<LongWritable, Text>, String>() { @Override - public String call(Tuple2<LongWritable, Text> v1) throws Exception { + public String call(Tuple2<LongWritable, Text> v1) { return v1._2().toString(); } }); @@ -1892,19 +1886,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testRawSocketStream() { - JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345); + ssc.rawSocketStream("localhost", 12345); } - private List<List<String>> fileTestPrepare(File testDir) throws IOException { + private static List<List<String>> fileTestPrepare(File testDir) throws IOException { File existingFile = new File(testDir, "0"); Files.write("0\n", existingFile, Charset.forName("UTF-8")); - assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("0") - ); - - return expected; + Assert.assertTrue(existingFile.setLastModified(1000)); + Assert.assertEquals(1000, existingFile.lastModified()); + return Arrays.asList(Arrays.asList("0")); } @SuppressWarnings("unchecked") diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 1b0787fe69..ec2bffd6a5 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -36,7 +36,6 @@ import java.io.InputStreamReader; import java.io.Serializable; import java.net.ConnectException; import java.net.Socket; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class JavaReceiverAPISuite implements Serializable { @@ -64,16 +63,16 @@ public class JavaReceiverAPISuite implements Serializable { ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); JavaDStream<String> mapped = input.map(new Function<String, String>() { @Override - public String call(String v1) throws Exception { + public String call(String v1) { return v1 + "."; } }); mapped.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override - public Void call(JavaRDD<String> rdd) throws Exception { - long count = rdd.count(); - dataCounter.addAndGet(count); - return null; + public Void call(JavaRDD<String> rdd) { + long count = rdd.count(); + dataCounter.addAndGet(count); + return null; } }); @@ -83,7 +82,7 @@ public class JavaReceiverAPISuite implements Serializable { Thread.sleep(200); for (int i = 0; i < 6; i++) { - server.send("" + i + "\n"); // \n to make sure these are separate lines + server.send(i + "\n"); // \n to make sure these are separate lines Thread.sleep(100); } while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) { @@ -95,50 +94,49 @@ public class JavaReceiverAPISuite implements Serializable { server.stop(); } } -} -class JavaSocketReceiver extends Receiver<String> { + private static class JavaSocketReceiver extends Receiver<String> { - String host = null; - int port = -1; + String host = null; + int port = -1; - public JavaSocketReceiver(String host_ , int port_) { - super(StorageLevel.MEMORY_AND_DISK()); - host = host_; - port = port_; - } + JavaSocketReceiver(String host_ , int port_) { + super(StorageLevel.MEMORY_AND_DISK()); + host = host_; + port = port_; + } - @Override - public void onStart() { - new Thread() { - @Override public void run() { - receive(); - } - }.start(); - } + @Override + public void onStart() { + new Thread() { + @Override public void run() { + receive(); + } + }.start(); + } - @Override - public void onStop() { - } + @Override + public void onStop() { + } - private void receive() { - Socket socket = null; - try { - socket = new Socket(host, port); - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); - String userInput; - while ((userInput = in.readLine()) != null) { - store(userInput); + private void receive() { + try { + Socket socket = new Socket(host, port); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + String userInput; + while ((userInput = in.readLine()) != null) { + store(userInput); + } + in.close(); + socket.close(); + } catch(ConnectException ce) { + ce.printStackTrace(); + restart("Could not connect", ce); + } catch(Throwable t) { + t.printStackTrace(); + restart("Error receiving data", t); } - in.close(); - socket.close(); - } catch(ConnectException ce) { - ce.printStackTrace(); - restart("Could not connect", ce); - } catch(Throwable t) { - t.printStackTrace(); - restart("Error receiving data", t); } } -} +} |