aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
Diffstat (limited to 'extras')
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java96
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java381
2 files changed, 233 insertions, 244 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index c366c10b15..729bc0459c 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -99,16 +99,16 @@ public class Java8APISuite implements Serializable {
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(1, 2),
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(3, 1)
+ new Tuple2<>(1, 1),
+ new Tuple2<>(1, 2),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<Integer, Character>(1, 'x'),
- new Tuple2<Integer, Character>(2, 'y'),
- new Tuple2<Integer, Character>(2, 'z'),
- new Tuple2<Integer, Character>(4, 'w')
+ new Tuple2<>(1, 'x'),
+ new Tuple2<>(2, 'y'),
+ new Tuple2<>(2, 'z'),
+ new Tuple2<>(4, 'w')
));
List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
@@ -133,11 +133,11 @@ public class Java8APISuite implements Serializable {
@Test
public void foldByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(3, 2),
- new Tuple2<Integer, Integer>(3, 1)
+ new Tuple2<>(2, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(3, 2),
+ new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
@@ -149,11 +149,11 @@ public class Java8APISuite implements Serializable {
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(3, 2),
- new Tuple2<Integer, Integer>(3, 1)
+ new Tuple2<>(2, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(3, 2),
+ new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
@@ -177,7 +177,7 @@ public class Java8APISuite implements Serializable {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
doubles.collect();
- JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x))
+ JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
.cache();
pairs.collect();
JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
@@ -194,31 +194,31 @@ public class Java8APISuite implements Serializable {
Assert.assertEquals(11, words.count());
JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
- List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>();
- for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word));
+ List<Tuple2<String, String>> pairs2 = new LinkedList<>();
+ for (String word : s.split(" ")) pairs2.add(new Tuple2<>(word, word));
return pairs2;
});
- Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
+ Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
Assert.assertEquals(11, pairs.count());
JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
- List<Double> lengths = new LinkedList<Double>();
+ List<Double> lengths = new LinkedList<>();
for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
return lengths;
});
Double x = doubles.first();
- Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+ Assert.assertEquals(5.0, doubles.first(), 0.01);
Assert.assertEquals(11, pairs.count());
}
@Test
public void mapsFromPairsToPairs() {
List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
@@ -251,19 +251,18 @@ public class Java8APISuite implements Serializable {
tempDir.deleteOnExit();
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.mapToPair(pair ->
- new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
+ rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
// Try reading the output back as an object file
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
- .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
+ .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
Assert.assertEquals(pairs, readRDD.collect());
Utils.deleteRecursively(tempDir);
}
@@ -325,7 +324,7 @@ public class Java8APISuite implements Serializable {
}
};
- final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+ final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
rdd.foreach(x -> floatAccum.add((float) x));
Assert.assertEquals((Float) 25.0f, floatAccum.value());
@@ -338,22 +337,22 @@ public class Java8APISuite implements Serializable {
public void keyBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
- Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
- Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+ Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
+ Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
}
@Test
public void mapOnPairRDD() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaPairRDD<Integer, Integer> rdd2 =
- rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+ rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
JavaPairRDD<Integer, Integer> rdd3 =
- rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1()));
+ rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
Assert.assertEquals(Arrays.asList(
new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(0, 2),
- new Tuple2<Integer, Integer>(1, 3),
- new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+ new Tuple2<>(0, 2),
+ new Tuple2<>(1, 3),
+ new Tuple2<>(0, 4)), rdd3.collect());
}
@Test
@@ -361,7 +360,7 @@ public class Java8APISuite implements Serializable {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
JavaPairRDD<Integer, Integer> rdd2 =
- rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+ rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
List[] parts = rdd1.collectPartitions(new int[]{0});
Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
@@ -369,16 +368,13 @@ public class Java8APISuite implements Serializable {
Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
- Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(2, 0)),
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
rdd2.collectPartitions(new int[]{0})[0]);
parts = rdd2.collectPartitions(new int[]{1, 2});
- Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
- new Tuple2<Integer, Integer>(4, 0)), parts[0]);
- Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
- new Tuple2<Integer, Integer>(6, 0),
- new Tuple2<Integer, Integer>(7, 1)), parts[1]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
+ parts[1]);
}
@Test
@@ -386,7 +382,7 @@ public class Java8APISuite implements Serializable {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
JavaPairRDD<Integer, int[]> pairRDD =
- rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x}));
+ rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
pairRDD.collect(); // Works fine
Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 43df0dea61..73091cfe2c 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -39,6 +39,7 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
* Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
* lambda syntax.
*/
+@SuppressWarnings("unchecked")
public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
@Test
@@ -52,7 +53,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Arrays.asList(9, 4));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> letterCount = stream.map(s -> s.length());
+ JavaDStream<Integer> letterCount = stream.map(String::length);
JavaTestUtils.attachTestOutputStream(letterCount);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -63,7 +64,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<String>> expected = Arrays.asList(
Arrays.asList("giants"),
@@ -81,11 +82,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
public void testMapPartitions() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<String>> expected = Arrays.asList(
Arrays.asList("GIANTSDODGERS"),
- Arrays.asList("YANKEESRED SOCKS"));
+ Arrays.asList("YANKEESRED SOX"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<String> mapped = stream.mapPartitions(in -> {
@@ -172,7 +173,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
List<List<Tuple2<String, Integer>>> pairInputData =
- Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
@@ -192,32 +193,32 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(
- new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
+ new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
Arrays.asList(
- new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("new york", "rangers")));
+ new Tuple2<>("california", "sharks"),
+ new Tuple2<>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
Arrays.asList(
- new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "mets")),
+ new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "mets")),
Arrays.asList(
- new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "islanders")));
+ new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "islanders")));
- List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Sets.newHashSet(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("dodgers", "giants")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("yankees", "mets"))),
+ new Tuple2<>("california",
+ new Tuple2<>("dodgers", "giants")),
+ new Tuple2<>("new york",
+ new Tuple2<>("yankees", "mets"))),
Sets.newHashSet(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("sharks", "ducks")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("rangers", "islanders"))));
+ new Tuple2<>("california",
+ new Tuple2<>("sharks", "ducks")),
+ new Tuple2<>("new york",
+ new Tuple2<>("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
ssc, stringStringKVStream1, 1);
@@ -232,7 +233,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
- List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+ List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
unorderedResult.add(Sets.newHashSet(res));
}
@@ -251,9 +252,9 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
List<List<Tuple2<String, Integer>>> pairInputData1 =
- Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
List<List<Tuple2<Double, Character>>> pairInputData2 =
- Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+ Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
@@ -293,13 +294,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
);
List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
- Arrays.asList(new Tuple2<Integer, String>(1, "x")),
- Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+ Arrays.asList(new Tuple2<>(1, "x")),
+ Arrays.asList(new Tuple2<>(2, "y"))
);
List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
- Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+ Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
+ Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
);
JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
@@ -312,7 +313,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
// This is just to test whether this transform to JavaStream compiles
JavaDStream<Long> transformed1 = ssc.transform(
listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
- assert (listOfRDDs.size() == 2);
+ Assert.assertEquals(2, listOfRDDs.size());
return null;
});
@@ -321,13 +322,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
- assert (listOfRDDs.size() == 3);
+ Assert.assertEquals(3, listOfRDDs.size());
JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
PairFunction<Integer, Integer, Integer> mapToTuple =
- (Integer i) -> new Tuple2<Integer, Integer>(i, i);
+ (Integer i) -> new Tuple2<>(i, i);
return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
});
JavaTestUtils.attachTestOutputStream(transformed2);
@@ -365,36 +366,36 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(6, "g"),
- new Tuple2<Integer, String>(6, "i"),
- new Tuple2<Integer, String>(6, "a"),
- new Tuple2<Integer, String>(6, "n"),
- new Tuple2<Integer, String>(6, "t"),
- new Tuple2<Integer, String>(6, "s")),
+ new Tuple2<>(6, "g"),
+ new Tuple2<>(6, "i"),
+ new Tuple2<>(6, "a"),
+ new Tuple2<>(6, "n"),
+ new Tuple2<>(6, "t"),
+ new Tuple2<>(6, "s")),
Arrays.asList(
- new Tuple2<Integer, String>(7, "d"),
- new Tuple2<Integer, String>(7, "o"),
- new Tuple2<Integer, String>(7, "d"),
- new Tuple2<Integer, String>(7, "g"),
- new Tuple2<Integer, String>(7, "e"),
- new Tuple2<Integer, String>(7, "r"),
- new Tuple2<Integer, String>(7, "s")),
+ new Tuple2<>(7, "d"),
+ new Tuple2<>(7, "o"),
+ new Tuple2<>(7, "d"),
+ new Tuple2<>(7, "g"),
+ new Tuple2<>(7, "e"),
+ new Tuple2<>(7, "r"),
+ new Tuple2<>(7, "s")),
Arrays.asList(
- new Tuple2<Integer, String>(9, "a"),
- new Tuple2<Integer, String>(9, "t"),
- new Tuple2<Integer, String>(9, "h"),
- new Tuple2<Integer, String>(9, "l"),
- new Tuple2<Integer, String>(9, "e"),
- new Tuple2<Integer, String>(9, "t"),
- new Tuple2<Integer, String>(9, "i"),
- new Tuple2<Integer, String>(9, "c"),
- new Tuple2<Integer, String>(9, "s")));
+ new Tuple2<>(9, "a"),
+ new Tuple2<>(9, "t"),
+ new Tuple2<>(9, "h"),
+ new Tuple2<>(9, "l"),
+ new Tuple2<>(9, "e"),
+ new Tuple2<>(9, "t"),
+ new Tuple2<>(9, "i"),
+ new Tuple2<>(9, "c"),
+ new Tuple2<>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
List<Tuple2<Integer, String>> out = Lists.newArrayList();
for (String letter : s.split("(?!^)")) {
- out.add(new Tuple2<Integer, String>(s.length(), letter));
+ out.add(new Tuple2<>(s.length(), letter));
}
return out;
});
@@ -411,12 +412,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
*/
public static <T extends Comparable<T>> void assertOrderInvariantEquals(
List<List<T>> expected, List<List<T>> actual) {
- for (List<T> list : expected) {
- Collections.sort(list);
- }
- for (List<T> list : actual) {
- Collections.sort(list);
- }
+ expected.forEach((List<T> list) -> Collections.sort(list));
+ actual.forEach((List<T> list) -> Collections.sort(list));
Assert.assertEquals(expected, actual);
}
@@ -424,11 +421,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
public void testPairFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
- Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+ Arrays.asList(new Tuple2<>("giants", 6)),
+ Arrays.asList(new Tuple2<>("yankees", 7)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream =
@@ -441,26 +438,26 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
}
List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "yankees"),
- new Tuple2<String, String>("new york", "mets")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "rangers"),
- new Tuple2<String, String>("new york", "islanders")));
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "yankees"),
+ new Tuple2<>("new york", "mets")),
+ Arrays.asList(new Tuple2<>("california", "sharks"),
+ new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "rangers"),
+ new Tuple2<>("new york", "islanders")));
List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 1),
- new Tuple2<String, Integer>("california", 3),
- new Tuple2<String, Integer>("new york", 4),
- new Tuple2<String, Integer>("new york", 1)),
+ new Tuple2<>("california", 1),
+ new Tuple2<>("california", 3),
+ new Tuple2<>("new york", 4),
+ new Tuple2<>("new york", 1)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("new york", 3),
- new Tuple2<String, Integer>("new york", 1)));
+ new Tuple2<>("california", 5),
+ new Tuple2<>("california", 5),
+ new Tuple2<>("new york", 3),
+ new Tuple2<>("new york", 1)));
@Test
public void testPairMap() { // Maps pair -> pair of different type
@@ -468,15 +465,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(1, "california"),
- new Tuple2<Integer, String>(3, "california"),
- new Tuple2<Integer, String>(4, "new york"),
- new Tuple2<Integer, String>(1, "new york")),
+ new Tuple2<>(1, "california"),
+ new Tuple2<>(3, "california"),
+ new Tuple2<>(4, "new york"),
+ new Tuple2<>(1, "new york")),
Arrays.asList(
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(3, "new york"),
- new Tuple2<Integer, String>(1, "new york")));
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(3, "new york"),
+ new Tuple2<>(1, "new york")));
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -494,21 +491,21 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(1, "california"),
- new Tuple2<Integer, String>(3, "california"),
- new Tuple2<Integer, String>(4, "new york"),
- new Tuple2<Integer, String>(1, "new york")),
+ new Tuple2<>(1, "california"),
+ new Tuple2<>(3, "california"),
+ new Tuple2<>(4, "new york"),
+ new Tuple2<>(1, "new york")),
Arrays.asList(
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(3, "new york"),
- new Tuple2<Integer, String>(1, "new york")));
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(3, "new york"),
+ new Tuple2<>(1, "new york")));
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
- LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ LinkedList<Tuple2<Integer, String>> out = new LinkedList<>();
while (in.hasNext()) {
Tuple2<String, Integer> next = in.next();
out.add(next.swap());
@@ -530,7 +527,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Arrays.asList(1, 3, 4, 1),
Arrays.asList(5, 5, 3, 1));
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
JavaTestUtils.attachTestOutputStream(reversed);
@@ -543,31 +541,31 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("hi", 1),
- new Tuple2<String, Integer>("ho", 2)),
+ new Tuple2<>("hi", 1),
+ new Tuple2<>("ho", 2)),
Arrays.asList(
- new Tuple2<String, Integer>("hi", 1),
- new Tuple2<String, Integer>("ho", 2)));
+ new Tuple2<>("hi", 1),
+ new Tuple2<>("ho", 2)));
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(1, "h"),
- new Tuple2<Integer, String>(1, "i"),
- new Tuple2<Integer, String>(2, "h"),
- new Tuple2<Integer, String>(2, "o")),
+ new Tuple2<>(1, "h"),
+ new Tuple2<>(1, "i"),
+ new Tuple2<>(2, "h"),
+ new Tuple2<>(2, "o")),
Arrays.asList(
- new Tuple2<Integer, String>(1, "h"),
- new Tuple2<Integer, String>(1, "i"),
- new Tuple2<Integer, String>(2, "h"),
- new Tuple2<Integer, String>(2, "o")));
+ new Tuple2<>(1, "h"),
+ new Tuple2<>(1, "i"),
+ new Tuple2<>(2, "h"),
+ new Tuple2<>(2, "o")));
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
- List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ List<Tuple2<Integer, String>> out = new LinkedList<>();
for (Character s : in._1().toCharArray()) {
- out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+ out.add(new Tuple2<>(in._2(), s.toString()));
}
return out;
});
@@ -584,11 +582,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
+ new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -608,11 +606,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
+ new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -632,12 +630,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -656,12 +654,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)));
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)));
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -689,12 +687,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -713,27 +711,27 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
public void testPairTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(2, 5)),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(2, 5)),
Arrays.asList(
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(1, 5)));
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(1, 5)));
List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5)),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5)),
Arrays.asList(
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5)));
+ new Tuple2<>(1, 5),
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5)));
JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -751,15 +749,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
public void testPairToNormalRDDTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(2, 5)),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(2, 5)),
Arrays.asList(
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(1, 5)));
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(1, 5)));
List<List<Integer>> expected = Arrays.asList(
Arrays.asList(3, 1, 4, 2),
@@ -780,20 +778,20 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
- new Tuple2<String, String>("california", "GIANTS"),
- new Tuple2<String, String>("new york", "YANKEES"),
- new Tuple2<String, String>("new york", "METS")),
- Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
- new Tuple2<String, String>("california", "DUCKS"),
- new Tuple2<String, String>("new york", "RANGERS"),
- new Tuple2<String, String>("new york", "ISLANDERS")));
+ Arrays.asList(new Tuple2<>("california", "DODGERS"),
+ new Tuple2<>("california", "GIANTS"),
+ new Tuple2<>("new york", "YANKEES"),
+ new Tuple2<>("new york", "METS")),
+ Arrays.asList(new Tuple2<>("california", "SHARKS"),
+ new Tuple2<>("california", "DUCKS"),
+ new Tuple2<>("new york", "RANGERS"),
+ new Tuple2<>("new york", "ISLANDERS")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase());
+ JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase);
JavaTestUtils.attachTestOutputStream(mapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -805,34 +803,29 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
- new Tuple2<String, String>("california", "dodgers2"),
- new Tuple2<String, String>("california", "giants1"),
- new Tuple2<String, String>("california", "giants2"),
- new Tuple2<String, String>("new york", "yankees1"),
- new Tuple2<String, String>("new york", "yankees2"),
- new Tuple2<String, String>("new york", "mets1"),
- new Tuple2<String, String>("new york", "mets2")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
- new Tuple2<String, String>("california", "sharks2"),
- new Tuple2<String, String>("california", "ducks1"),
- new Tuple2<String, String>("california", "ducks2"),
- new Tuple2<String, String>("new york", "rangers1"),
- new Tuple2<String, String>("new york", "rangers2"),
- new Tuple2<String, String>("new york", "islanders1"),
- new Tuple2<String, String>("new york", "islanders2")));
+ Arrays.asList(new Tuple2<>("california", "dodgers1"),
+ new Tuple2<>("california", "dodgers2"),
+ new Tuple2<>("california", "giants1"),
+ new Tuple2<>("california", "giants2"),
+ new Tuple2<>("new york", "yankees1"),
+ new Tuple2<>("new york", "yankees2"),
+ new Tuple2<>("new york", "mets1"),
+ new Tuple2<>("new york", "mets2")),
+ Arrays.asList(new Tuple2<>("california", "sharks1"),
+ new Tuple2<>("california", "sharks2"),
+ new Tuple2<>("california", "ducks1"),
+ new Tuple2<>("california", "ducks2"),
+ new Tuple2<>("new york", "rangers1"),
+ new Tuple2<>("new york", "rangers2"),
+ new Tuple2<>("new york", "islanders1"),
+ new Tuple2<>("new york", "islanders2")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
- List<String> out = new ArrayList<String>();
- out.add(in + "1");
- out.add(in + "2");
- return out;
- });
+ JavaPairDStream<String, String> flatMapped =
+ pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2"));
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);