aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-09-12 10:40:10 +0100
committerSean Owen <sowen@cloudera.com>2015-09-12 10:40:10 +0100
commit22730ad54d681ad30e63fe910e8d89360853177d (patch)
tree81194034499a6d391a0949e865fc0aa6dd5fc4ec /streaming
parent8285e3b0d3dc0eff669eba993742dfe0401116f9 (diff)
downloadspark-22730ad54d681ad30e63fe910e8d89360853177d.tar.gz
spark-22730ad54d681ad30e63fe910e8d89360853177d.tar.bz2
spark-22730ad54d681ad30e63fe910e8d89360853177d.zip
[SPARK-10547] [TEST] Streamline / improve style of Java API tests
Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order Author: Sean Owen <sowen@cloudera.com> Closes #8706 from srowen/SPARK-10547.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java752
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java86
2 files changed, 413 insertions, 425 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index e0718f73aa..c521714922 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -18,24 +18,22 @@
package org.apache.spark.streaming;
import java.io.*;
-import java.lang.Iterable;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import scala.Tuple2;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import scala.Tuple2;
-
import org.junit.Assert;
-import static org.junit.Assert.*;
import org.junit.Test;
import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
@@ -54,14 +52,14 @@ import org.apache.spark.SparkConf;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
- public void equalIterator(Iterator<?> a, Iterator<?> b) {
+ public static void equalIterator(Iterator<?> a, Iterator<?> b) {
while (a.hasNext() && b.hasNext()) {
Assert.assertEquals(a.next(), b.next());
}
Assert.assertEquals(a.hasNext(), b.hasNext());
}
- public void equalIterable(Iterable<?> a, Iterable<?> b) {
+ public static void equalIterable(Iterable<?> a, Iterable<?> b) {
equalIterator(a.iterator(), b.iterator());
}
@@ -74,14 +72,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testContextState() {
List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
- Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+ Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaTestUtils.attachTestOutputStream(stream);
- Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+ Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
ssc.start();
- Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE);
+ Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState());
ssc.stop();
- Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
+ Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState());
}
@SuppressWarnings("unchecked")
@@ -118,7 +116,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
- public Integer call(String s) throws Exception {
+ public Integer call(String s) {
return s.length();
}
});
@@ -180,7 +178,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<String>> expected = Arrays.asList(
Arrays.asList("giants"),
@@ -189,7 +187,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() {
@Override
- public Boolean call(String s) throws Exception {
+ public Boolean call(String s) {
return s.contains("a");
}
});
@@ -243,11 +241,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testGlom() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<List<String>>> expected = Arrays.asList(
Arrays.asList(Arrays.asList("giants", "dodgers")),
- Arrays.asList(Arrays.asList("yankees", "red socks")));
+ Arrays.asList(Arrays.asList("yankees", "red sox")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<List<String>> glommed = stream.glom();
@@ -262,22 +260,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testMapPartitions() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<String>> expected = Arrays.asList(
Arrays.asList("GIANTSDODGERS"),
- Arrays.asList("YANKEESRED SOCKS"));
+ Arrays.asList("YANKEESRED SOX"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<String> mapped = stream.mapPartitions(
new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> in) {
- String out = "";
+ StringBuilder out = new StringBuilder();
while (in.hasNext()) {
- out = out + in.next().toUpperCase();
+ out.append(in.next().toUpperCase(Locale.ENGLISH));
}
- return Lists.newArrayList(out);
+ return Arrays.asList(out.toString());
}
});
JavaTestUtils.attachTestOutputStream(mapped);
@@ -286,16 +284,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
- private class IntegerSum implements Function2<Integer, Integer, Integer> {
+ private static class IntegerSum implements Function2<Integer, Integer, Integer> {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
- private class IntegerDifference implements Function2<Integer, Integer, Integer> {
+ private static class IntegerDifference implements Function2<Integer, Integer, Integer> {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 - i2;
}
}
@@ -347,13 +345,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(24));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = null;
+ JavaDStream<Integer> reducedWindowed;
if (withInverse) {
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
- new IntegerDifference(), new Duration(2000), new Duration(1000));
+ new IntegerDifference(), new Duration(2000), new Duration(1000));
} else {
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
- new Duration(2000), new Duration(1000));
+ new Duration(2000), new Duration(1000));
}
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -378,11 +376,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(7,8,9));
JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
- JavaRDD<Integer> rdd1 = ssc.sparkContext().parallelize(Arrays.asList(1, 2, 3));
- JavaRDD<Integer> rdd2 = ssc.sparkContext().parallelize(Arrays.asList(4, 5, 6));
- JavaRDD<Integer> rdd3 = ssc.sparkContext().parallelize(Arrays.asList(7,8,9));
+ JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3));
+ JavaRDD<Integer> rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6));
+ JavaRDD<Integer> rdd3 = jsc.parallelize(Arrays.asList(7,8,9));
- LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
+ Queue<JavaRDD<Integer>> rdds = new LinkedList<>();
rdds.add(rdd1);
rdds.add(rdd2);
rdds.add(rdd3);
@@ -410,10 +408,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Integer> transformed = stream.transform(
new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
@Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) {
return in.map(new Function<Integer, Integer>() {
@Override
- public Integer call(Integer i) throws Exception {
+ public Integer call(Integer i) {
return i + 2;
}
});
@@ -435,70 +433,70 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
List<List<Tuple2<String, Integer>>> pairInputData =
- Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
- JavaDStream<Integer> transformed1 = stream.transform(
+ stream.transform(
new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
@Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) {
return null;
}
}
);
- JavaDStream<Integer> transformed2 = stream.transform(
+ stream.transform(
new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) {
return null;
}
}
);
- JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(
+ stream.transformToPair(
new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) {
return null;
}
}
);
- JavaPairDStream<String, Integer> transformed4 = stream.transformToPair(
+ stream.transformToPair(
new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) {
return null;
}
}
);
- JavaDStream<Integer> pairTransformed1 = pairStream.transform(
+ pairStream.transform(
new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) {
return null;
}
}
);
- JavaDStream<Integer> pairTransformed2 = pairStream.transform(
+ pairStream.transform(
new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) {
return null;
}
}
);
- JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(
+ pairStream.transformToPair(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
- @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) {
return null;
}
}
);
- JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair(
+ pairStream.transformToPair(
new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
- @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) {
return null;
}
}
@@ -511,32 +509,32 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(
- new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
+ new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
Arrays.asList(
- new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("new york", "rangers")));
+ new Tuple2<>("california", "sharks"),
+ new Tuple2<>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
Arrays.asList(
- new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "mets")),
+ new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "mets")),
Arrays.asList(
- new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "islanders")));
+ new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "islanders")));
List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Sets.newHashSet(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("dodgers", "giants")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("yankees", "mets"))),
+ new Tuple2<>("california",
+ new Tuple2<>("dodgers", "giants")),
+ new Tuple2<>("new york",
+ new Tuple2<>("yankees", "mets"))),
Sets.newHashSet(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("sharks", "ducks")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("rangers", "islanders"))));
+ new Tuple2<>("california",
+ new Tuple2<>("sharks", "ducks")),
+ new Tuple2<>("new york",
+ new Tuple2<>("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
ssc, stringStringKVStream1, 1);
@@ -552,14 +550,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairRDD<String, String>,
JavaPairRDD<String, String>,
Time,
- JavaPairRDD<String, Tuple2<String, String>>
- >() {
+ JavaPairRDD<String, Tuple2<String, String>>>() {
@Override
public JavaPairRDD<String, Tuple2<String, String>> call(
JavaPairRDD<String, String> rdd1,
JavaPairRDD<String, String> rdd2,
- Time time
- ) throws Exception {
+ Time time) {
return rdd1.join(rdd2);
}
}
@@ -567,9 +563,9 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
- List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>();
for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
- unorderedResult.add(Sets.newHashSet(res));
+ unorderedResult.add(Sets.newHashSet(res));
}
Assert.assertEquals(expected, unorderedResult);
@@ -587,89 +583,89 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
List<List<Tuple2<String, Integer>>> pairInputData1 =
- Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
List<List<Tuple2<Double, Character>>> pairInputData2 =
- Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+ Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
- JavaDStream<Double> transformed1 = stream1.transformWith(
+ stream1.transformWith(
stream2,
new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
@Override
- public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) {
return null;
}
}
);
- JavaDStream<Double> transformed2 = stream1.transformWith(
+ stream1.transformWith(
pairStream1,
new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
@Override
- public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) {
return null;
}
}
);
- JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair(
+ stream1.transformWithToPair(
stream2,
new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
- public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) {
return null;
}
}
);
- JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair(
+ stream1.transformWithToPair(
pairStream1,
new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
@Override
- public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) {
return null;
}
}
);
- JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(
+ pairStream1.transformWith(
stream2,
new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
@Override
- public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) {
return null;
}
}
);
- JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith(
+ pairStream1.transformWith(
pairStream1,
new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
@Override
- public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) {
return null;
}
}
);
- JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair(
+ pairStream1.transformWithToPair(
stream2,
new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
- public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) {
return null;
}
}
);
- JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair(
+ pairStream1.transformWithToPair(
pairStream2,
new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
@Override
- public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception {
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) {
return null;
}
}
@@ -690,13 +686,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
);
List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
- Arrays.asList(new Tuple2<Integer, String>(1, "x")),
- Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+ Arrays.asList(new Tuple2<>(1, "x")),
+ Arrays.asList(new Tuple2<>(2, "y"))
);
List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
- Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+ Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
+ Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
);
JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
@@ -707,7 +703,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
// This is just to test whether this transform to JavaStream compiles
- JavaDStream<Long> transformed1 = ssc.transform(
+ ssc.transform(
listOfDStreams1,
new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
@Override
@@ -733,8 +729,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() {
@Override
- public Tuple2<Integer, Integer> call(Integer i) throws Exception {
- return new Tuple2<Integer, Integer>(i, i);
+ public Tuple2<Integer, Integer> call(Integer i) {
+ return new Tuple2<>(i, i);
}
};
return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
@@ -763,7 +759,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split("(?!^)"));
+ return Arrays.asList(x.split("(?!^)"));
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -782,39 +778,39 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(6, "g"),
- new Tuple2<Integer, String>(6, "i"),
- new Tuple2<Integer, String>(6, "a"),
- new Tuple2<Integer, String>(6, "n"),
- new Tuple2<Integer, String>(6, "t"),
- new Tuple2<Integer, String>(6, "s")),
+ new Tuple2<>(6, "g"),
+ new Tuple2<>(6, "i"),
+ new Tuple2<>(6, "a"),
+ new Tuple2<>(6, "n"),
+ new Tuple2<>(6, "t"),
+ new Tuple2<>(6, "s")),
Arrays.asList(
- new Tuple2<Integer, String>(7, "d"),
- new Tuple2<Integer, String>(7, "o"),
- new Tuple2<Integer, String>(7, "d"),
- new Tuple2<Integer, String>(7, "g"),
- new Tuple2<Integer, String>(7, "e"),
- new Tuple2<Integer, String>(7, "r"),
- new Tuple2<Integer, String>(7, "s")),
+ new Tuple2<>(7, "d"),
+ new Tuple2<>(7, "o"),
+ new Tuple2<>(7, "d"),
+ new Tuple2<>(7, "g"),
+ new Tuple2<>(7, "e"),
+ new Tuple2<>(7, "r"),
+ new Tuple2<>(7, "s")),
Arrays.asList(
- new Tuple2<Integer, String>(9, "a"),
- new Tuple2<Integer, String>(9, "t"),
- new Tuple2<Integer, String>(9, "h"),
- new Tuple2<Integer, String>(9, "l"),
- new Tuple2<Integer, String>(9, "e"),
- new Tuple2<Integer, String>(9, "t"),
- new Tuple2<Integer, String>(9, "i"),
- new Tuple2<Integer, String>(9, "c"),
- new Tuple2<Integer, String>(9, "s")));
+ new Tuple2<>(9, "a"),
+ new Tuple2<>(9, "t"),
+ new Tuple2<>(9, "h"),
+ new Tuple2<>(9, "l"),
+ new Tuple2<>(9, "e"),
+ new Tuple2<>(9, "t"),
+ new Tuple2<>(9, "i"),
+ new Tuple2<>(9, "c"),
+ new Tuple2<>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
new PairFlatMapFunction<String, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ public Iterable<Tuple2<Integer, String>> call(String in) {
+ List<Tuple2<Integer, String>> out = new ArrayList<>();
for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<Integer, String>(in.length(), letter));
+ out.add(new Tuple2<>(in.length(), letter));
}
return out;
}
@@ -859,13 +855,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
*/
public static <T> void assertOrderInvariantEquals(
List<List<T>> expected, List<List<T>> actual) {
- List<Set<T>> expectedSets = new ArrayList<Set<T>>();
+ List<Set<T>> expectedSets = new ArrayList<>();
for (List<T> list: expected) {
- expectedSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
+ expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
}
- List<Set<T>> actualSets = new ArrayList<Set<T>>();
+ List<Set<T>> actualSets = new ArrayList<>();
for (List<T> list: actual) {
- actualSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
+ actualSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
}
Assert.assertEquals(expectedSets, actualSets);
}
@@ -877,25 +873,25 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testPairFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
- Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+ Arrays.asList(new Tuple2<>("giants", 6)),
+ Arrays.asList(new Tuple2<>("yankees", 7)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2<String, Integer> call(String in) throws Exception {
- return new Tuple2<String, Integer>(in, in.length());
+ public Tuple2<String, Integer> call(String in) {
+ return new Tuple2<>(in, in.length());
}
});
JavaPairDStream<String, Integer> filtered = pairStream.filter(
new Function<Tuple2<String, Integer>, Boolean>() {
@Override
- public Boolean call(Tuple2<String, Integer> in) throws Exception {
+ public Boolean call(Tuple2<String, Integer> in) {
return in._1().contains("a");
}
});
@@ -906,28 +902,28 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
@SuppressWarnings("unchecked")
- private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "yankees"),
- new Tuple2<String, String>("new york", "mets")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "rangers"),
- new Tuple2<String, String>("new york", "islanders")));
+ private final List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "yankees"),
+ new Tuple2<>("new york", "mets")),
+ Arrays.asList(new Tuple2<>("california", "sharks"),
+ new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "rangers"),
+ new Tuple2<>("new york", "islanders")));
@SuppressWarnings("unchecked")
- private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ private final List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 1),
- new Tuple2<String, Integer>("california", 3),
- new Tuple2<String, Integer>("new york", 4),
- new Tuple2<String, Integer>("new york", 1)),
+ new Tuple2<>("california", 1),
+ new Tuple2<>("california", 3),
+ new Tuple2<>("new york", 4),
+ new Tuple2<>("new york", 1)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("new york", 3),
- new Tuple2<String, Integer>("new york", 1)));
+ new Tuple2<>("california", 5),
+ new Tuple2<>("california", 5),
+ new Tuple2<>("new york", 3),
+ new Tuple2<>("new york", 1)));
@SuppressWarnings("unchecked")
@Test
@@ -936,22 +932,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(1, "california"),
- new Tuple2<Integer, String>(3, "california"),
- new Tuple2<Integer, String>(4, "new york"),
- new Tuple2<Integer, String>(1, "new york")),
+ new Tuple2<>(1, "california"),
+ new Tuple2<>(3, "california"),
+ new Tuple2<>(4, "new york"),
+ new Tuple2<>(1, "new york")),
Arrays.asList(
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(3, "new york"),
- new Tuple2<Integer, String>(1, "new york")));
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(3, "new york"),
+ new Tuple2<>(1, "new york")));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
- public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
+ public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
return in.swap();
}
});
@@ -969,23 +965,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(1, "california"),
- new Tuple2<Integer, String>(3, "california"),
- new Tuple2<Integer, String>(4, "new york"),
- new Tuple2<Integer, String>(1, "new york")),
+ new Tuple2<>(1, "california"),
+ new Tuple2<>(3, "california"),
+ new Tuple2<>(4, "new york"),
+ new Tuple2<>(1, "new york")),
Arrays.asList(
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(3, "new york"),
- new Tuple2<Integer, String>(1, "new york")));
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(3, "new york"),
+ new Tuple2<>(1, "new york")));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
- LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
+ List<Tuple2<Integer, String>> out = new LinkedList<>();
while (in.hasNext()) {
Tuple2<String, Integer> next = in.next();
out.add(next.swap());
@@ -1014,7 +1010,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Integer> reversed = pairStream.map(
new Function<Tuple2<String, Integer>, Integer>() {
@Override
- public Integer call(Tuple2<String, Integer> in) throws Exception {
+ public Integer call(Tuple2<String, Integer> in) {
return in._2();
}
});
@@ -1030,23 +1026,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("hi", 1),
- new Tuple2<String, Integer>("ho", 2)),
+ new Tuple2<>("hi", 1),
+ new Tuple2<>("ho", 2)),
Arrays.asList(
- new Tuple2<String, Integer>("hi", 1),
- new Tuple2<String, Integer>("ho", 2)));
+ new Tuple2<>("hi", 1),
+ new Tuple2<>("ho", 2)));
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(1, "h"),
- new Tuple2<Integer, String>(1, "i"),
- new Tuple2<Integer, String>(2, "h"),
- new Tuple2<Integer, String>(2, "o")),
+ new Tuple2<>(1, "h"),
+ new Tuple2<>(1, "i"),
+ new Tuple2<>(2, "h"),
+ new Tuple2<>(2, "o")),
Arrays.asList(
- new Tuple2<Integer, String>(1, "h"),
- new Tuple2<Integer, String>(1, "i"),
- new Tuple2<Integer, String>(2, "h"),
- new Tuple2<Integer, String>(2, "o")));
+ new Tuple2<>(1, "h"),
+ new Tuple2<>(1, "i"),
+ new Tuple2<>(2, "h"),
+ new Tuple2<>(2, "o")));
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -1054,10 +1050,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
- List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
+ List<Tuple2<Integer, String>> out = new LinkedList<>();
for (Character s : in._1().toCharArray()) {
- out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+ out.add(new Tuple2<>(in._2(), s.toString()));
}
return out;
}
@@ -1075,11 +1071,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
+ new Tuple2<>("california", Arrays.asList("dodgers", "giants")),
+ new Tuple2<>("new york", Arrays.asList("yankees", "mets"))),
Arrays.asList(
- new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+ new Tuple2<>("california", Arrays.asList("sharks", "ducks")),
+ new Tuple2<>("new york", Arrays.asList("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1111,11 +1107,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
+ new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1136,20 +1132,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
+ new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
+ JavaPairDStream<String, Integer> combined = pairStream.combineByKey(
new Function<Integer, Integer>() {
@Override
- public Integer call(Integer i) throws Exception {
+ public Integer call(Integer i) {
return i;
}
}, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
@@ -1170,13 +1166,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("world", 1L)),
+ new Tuple2<>("hello", 1L),
+ new Tuple2<>("world", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("moon", 1L)),
+ new Tuple2<>("hello", 1L),
+ new Tuple2<>("moon", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("hello", 1L)));
+ new Tuple2<>("hello", 1L)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Long> counted = stream.countByValue();
@@ -1193,16 +1189,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)),
- new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4))
+ new Tuple2<>("california", Arrays.asList(1, 3)),
+ new Tuple2<>("new york", Arrays.asList(1, 4))
),
Arrays.asList(
- new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)),
- new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4))
+ new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)),
+ new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4))
),
Arrays.asList(
- new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)),
- new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3))
+ new Tuple2<>("california", Arrays.asList(5, 5)),
+ new Tuple2<>("new york", Arrays.asList(1, 3))
)
);
@@ -1220,16 +1216,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
}
- private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
- List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
+ private static Set<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+ List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>();
for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
newListOfTuples.add(convert(tuple));
}
- return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
+ return new HashSet<>(newListOfTuples);
}
- private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
- return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
+ private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
+ return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2()));
}
@SuppressWarnings("unchecked")
@@ -1238,12 +1234,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1262,12 +1258,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)));
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1278,10 +1274,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
int out = 0;
if (state.isPresent()) {
- out = out + state.get();
+ out += state.get();
}
for (Integer v : values) {
- out = out + v;
+ out += v;
}
return Optional.of(out);
}
@@ -1298,19 +1294,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<Tuple2<String, Integer>> initial = Arrays.asList (
- new Tuple2<String, Integer> ("california", 1),
- new Tuple2<String, Integer> ("new york", 2));
+ new Tuple2<>("california", 1),
+ new Tuple2<>("new york", 2));
JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial);
JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD (tmpRDD);
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("new york", 7)),
- Arrays.asList(new Tuple2<String, Integer>("california", 15),
- new Tuple2<String, Integer>("new york", 11)),
- Arrays.asList(new Tuple2<String, Integer>("california", 15),
- new Tuple2<String, Integer>("new york", 11)));
+ Arrays.asList(new Tuple2<>("california", 5),
+ new Tuple2<>("new york", 7)),
+ Arrays.asList(new Tuple2<>("california", 15),
+ new Tuple2<>("new york", 11)),
+ Arrays.asList(new Tuple2<>("california", 15),
+ new Tuple2<>("new york", 11)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1321,10 +1317,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
int out = 0;
if (state.isPresent()) {
- out = out + state.get();
+ out += state.get();
}
for (Integer v : values) {
- out = out + v;
+ out += v;
}
return Optional.of(out);
}
@@ -1341,19 +1337,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
- new Duration(2000), new Duration(1000));
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1370,15 +1366,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
Sets.newHashSet(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("world", 1L)),
+ new Tuple2<>("hello", 1L),
+ new Tuple2<>("world", 1L)),
Sets.newHashSet(
- new Tuple2<String, Long>("hello", 2L),
- new Tuple2<String, Long>("world", 1L),
- new Tuple2<String, Long>("moon", 1L)),
+ new Tuple2<>("hello", 2L),
+ new Tuple2<>("world", 1L),
+ new Tuple2<>("moon", 1L)),
Sets.newHashSet(
- new Tuple2<String, Long>("hello", 2L),
- new Tuple2<String, Long>("moon", 1L)));
+ new Tuple2<>("hello", 2L),
+ new Tuple2<>("moon", 1L)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1386,7 +1382,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList();
+ List<Set<Tuple2<String, Long>>> unorderedResult = new ArrayList<>();
for (List<Tuple2<String, Long>> res: result) {
unorderedResult.add(Sets.newHashSet(res));
}
@@ -1399,27 +1395,27 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testPairTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(2, 5)),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(2, 5)),
Arrays.asList(
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(1, 5)));
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(1, 5)));
List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5)),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5)),
Arrays.asList(
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5)));
+ new Tuple2<>(1, 5),
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5)));
JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1428,7 +1424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
@Override
- public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) {
return in.sortByKey();
}
});
@@ -1444,15 +1440,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testPairToNormalRDDTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(2, 5)),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(2, 5)),
Arrays.asList(
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(1, 5)));
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(1, 5)));
List<List<Integer>> expected = Arrays.asList(
Arrays.asList(3,1,4,2),
@@ -1465,11 +1461,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Integer> firstParts = pairStream.transform(
new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
@Override
- public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) {
return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
@Override
- public Integer call(Tuple2<Integer, Integer> in) {
- return in._1();
+ public Integer call(Tuple2<Integer, Integer> in2) {
+ return in2._1();
}
});
}
@@ -1487,14 +1483,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
- new Tuple2<String, String>("california", "GIANTS"),
- new Tuple2<String, String>("new york", "YANKEES"),
- new Tuple2<String, String>("new york", "METS")),
- Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
- new Tuple2<String, String>("california", "DUCKS"),
- new Tuple2<String, String>("new york", "RANGERS"),
- new Tuple2<String, String>("new york", "ISLANDERS")));
+ Arrays.asList(new Tuple2<>("california", "DODGERS"),
+ new Tuple2<>("california", "GIANTS"),
+ new Tuple2<>("new york", "YANKEES"),
+ new Tuple2<>("new york", "METS")),
+ Arrays.asList(new Tuple2<>("california", "SHARKS"),
+ new Tuple2<>("california", "DUCKS"),
+ new Tuple2<>("new york", "RANGERS"),
+ new Tuple2<>("new york", "ISLANDERS")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1502,8 +1498,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
@Override
- public String call(String s) throws Exception {
- return s.toUpperCase();
+ public String call(String s) {
+ return s.toUpperCase(Locale.ENGLISH);
}
});
@@ -1519,22 +1515,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
- new Tuple2<String, String>("california", "dodgers2"),
- new Tuple2<String, String>("california", "giants1"),
- new Tuple2<String, String>("california", "giants2"),
- new Tuple2<String, String>("new york", "yankees1"),
- new Tuple2<String, String>("new york", "yankees2"),
- new Tuple2<String, String>("new york", "mets1"),
- new Tuple2<String, String>("new york", "mets2")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
- new Tuple2<String, String>("california", "sharks2"),
- new Tuple2<String, String>("california", "ducks1"),
- new Tuple2<String, String>("california", "ducks2"),
- new Tuple2<String, String>("new york", "rangers1"),
- new Tuple2<String, String>("new york", "rangers2"),
- new Tuple2<String, String>("new york", "islanders1"),
- new Tuple2<String, String>("new york", "islanders2")));
+ Arrays.asList(new Tuple2<>("california", "dodgers1"),
+ new Tuple2<>("california", "dodgers2"),
+ new Tuple2<>("california", "giants1"),
+ new Tuple2<>("california", "giants2"),
+ new Tuple2<>("new york", "yankees1"),
+ new Tuple2<>("new york", "yankees2"),
+ new Tuple2<>("new york", "mets1"),
+ new Tuple2<>("new york", "mets2")),
+ Arrays.asList(new Tuple2<>("california", "sharks1"),
+ new Tuple2<>("california", "sharks2"),
+ new Tuple2<>("california", "ducks1"),
+ new Tuple2<>("california", "ducks2"),
+ new Tuple2<>("new york", "rangers1"),
+ new Tuple2<>("new york", "rangers2"),
+ new Tuple2<>("new york", "islanders1"),
+ new Tuple2<>("new york", "islanders2")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1545,7 +1541,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Function<String, Iterable<String>>() {
@Override
public Iterable<String> call(String in) {
- List<String> out = new ArrayList<String>();
+ List<String> out = new ArrayList<>();
out.add(in + "1");
out.add(in + "2");
return out;
@@ -1562,29 +1558,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testCoGroup() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("new york", "rangers")));
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
+ Arrays.asList(new Tuple2<>("california", "sharks"),
+ new Tuple2<>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "mets")),
- Arrays.asList(new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "islanders")));
+ Arrays.asList(new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "mets")),
+ Arrays.asList(new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
- new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
- new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
- new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))),
+ new Tuple2<>("california",
+ new Tuple2<>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
+ new Tuple2<>("new york",
+ new Tuple2<>(Arrays.asList("yankees"), Arrays.asList("mets")))),
Arrays.asList(
- new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
- new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
- new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
- new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
+ new Tuple2<>("california",
+ new Tuple2<>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
+ new Tuple2<>("new york",
+ new Tuple2<>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
@@ -1620,29 +1616,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("new york", "rangers")));
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
+ Arrays.asList(new Tuple2<>("california", "sharks"),
+ new Tuple2<>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "mets")),
- Arrays.asList(new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "islanders")));
+ Arrays.asList(new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "mets")),
+ Arrays.asList(new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("dodgers", "giants")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("yankees", "mets"))),
+ new Tuple2<>("california",
+ new Tuple2<>("dodgers", "giants")),
+ new Tuple2<>("new york",
+ new Tuple2<>("yankees", "mets"))),
Arrays.asList(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("sharks", "ducks")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("rangers", "islanders"))));
+ new Tuple2<>("california",
+ new Tuple2<>("sharks", "ducks")),
+ new Tuple2<>("new york",
+ new Tuple2<>("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
@@ -1664,13 +1660,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testLeftOuterJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks") ));
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
+ Arrays.asList(new Tuple2<>("california", "sharks") ));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "giants") ),
- Arrays.asList(new Tuple2<String, String>("new york", "islanders") )
+ Arrays.asList(new Tuple2<>("california", "giants") ),
+ Arrays.asList(new Tuple2<>("new york", "islanders") )
);
@@ -1713,7 +1709,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
- public Integer call(String s) throws Exception {
+ public Integer call(String s) {
return s.length();
}
});
@@ -1752,6 +1748,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// (used to detect the new context)
final AtomicBoolean newContextCreated = new AtomicBoolean(false);
Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() {
+ @Override
public JavaStreamingContext call() {
newContextCreated.set(true);
return new JavaStreamingContext(conf, Seconds.apply(1));
@@ -1765,20 +1762,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration(), true);
+ new Configuration(), true);
Assert.assertTrue("new context not created", newContextCreated.get());
ssc.stop();
newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration());
+ new Configuration());
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
newContextCreated.set(false);
JavaSparkContext sc = new JavaSparkContext(conf);
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration());
+ new Configuration());
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
}
@@ -1800,7 +1797,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
- public Integer call(String s) throws Exception {
+ public Integer call(String s) {
return s.length();
}
});
@@ -1818,29 +1815,26 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// InputStream functionality is deferred to the existing Scala tests.
@Test
public void testSocketTextStream() {
- JavaReceiverInputDStream<String> test = ssc.socketTextStream("localhost", 12345);
+ ssc.socketTextStream("localhost", 12345);
}
@Test
public void testSocketString() {
-
- class Converter implements Function<InputStream, Iterable<String>> {
- public Iterable<String> call(InputStream in) throws IOException {
- BufferedReader reader = new BufferedReader(new InputStreamReader(in));
- List<String> out = new ArrayList<String>();
- while (true) {
- String line = reader.readLine();
- if (line == null) { break; }
- out.add(line);
- }
- return out;
- }
- }
-
- JavaDStream<String> test = ssc.socketStream(
+ ssc.socketStream(
"localhost",
12345,
- new Converter(),
+ new Function<InputStream, Iterable<String>>() {
+ @Override
+ public Iterable<String> call(InputStream in) throws IOException {
+ List<String> out = new ArrayList<>();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+ for (String line; (line = reader.readLine()) != null;) {
+ out.add(line);
+ }
+ }
+ return out;
+ }
+ },
StorageLevel.MEMORY_ONLY());
}
@@ -1870,7 +1864,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
TextInputFormat.class,
new Function<Path, Boolean>() {
@Override
- public Boolean call(Path v1) throws Exception {
+ public Boolean call(Path v1) {
return Boolean.TRUE;
}
},
@@ -1879,7 +1873,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> test = inputStream.map(
new Function<Tuple2<LongWritable, Text>, String>() {
@Override
- public String call(Tuple2<LongWritable, Text> v1) throws Exception {
+ public String call(Tuple2<LongWritable, Text> v1) {
return v1._2().toString();
}
});
@@ -1892,19 +1886,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testRawSocketStream() {
- JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
+ ssc.rawSocketStream("localhost", 12345);
}
- private List<List<String>> fileTestPrepare(File testDir) throws IOException {
+ private static List<List<String>> fileTestPrepare(File testDir) throws IOException {
File existingFile = new File(testDir, "0");
Files.write("0\n", existingFile, Charset.forName("UTF-8"));
- assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000);
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("0")
- );
-
- return expected;
+ Assert.assertTrue(existingFile.setLastModified(1000));
+ Assert.assertEquals(1000, existingFile.lastModified());
+ return Arrays.asList(Arrays.asList("0"));
}
@SuppressWarnings("unchecked")
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 1b0787fe69..ec2bffd6a5 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -36,7 +36,6 @@ import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.Socket;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class JavaReceiverAPISuite implements Serializable {
@@ -64,16 +63,16 @@ public class JavaReceiverAPISuite implements Serializable {
ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
JavaDStream<String> mapped = input.map(new Function<String, String>() {
@Override
- public String call(String v1) throws Exception {
+ public String call(String v1) {
return v1 + ".";
}
});
mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
- public Void call(JavaRDD<String> rdd) throws Exception {
- long count = rdd.count();
- dataCounter.addAndGet(count);
- return null;
+ public Void call(JavaRDD<String> rdd) {
+ long count = rdd.count();
+ dataCounter.addAndGet(count);
+ return null;
}
});
@@ -83,7 +82,7 @@ public class JavaReceiverAPISuite implements Serializable {
Thread.sleep(200);
for (int i = 0; i < 6; i++) {
- server.send("" + i + "\n"); // \n to make sure these are separate lines
+ server.send(i + "\n"); // \n to make sure these are separate lines
Thread.sleep(100);
}
while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) {
@@ -95,50 +94,49 @@ public class JavaReceiverAPISuite implements Serializable {
server.stop();
}
}
-}
-class JavaSocketReceiver extends Receiver<String> {
+ private static class JavaSocketReceiver extends Receiver<String> {
- String host = null;
- int port = -1;
+ String host = null;
+ int port = -1;
- public JavaSocketReceiver(String host_ , int port_) {
- super(StorageLevel.MEMORY_AND_DISK());
- host = host_;
- port = port_;
- }
+ JavaSocketReceiver(String host_ , int port_) {
+ super(StorageLevel.MEMORY_AND_DISK());
+ host = host_;
+ port = port_;
+ }
- @Override
- public void onStart() {
- new Thread() {
- @Override public void run() {
- receive();
- }
- }.start();
- }
+ @Override
+ public void onStart() {
+ new Thread() {
+ @Override public void run() {
+ receive();
+ }
+ }.start();
+ }
- @Override
- public void onStop() {
- }
+ @Override
+ public void onStop() {
+ }
- private void receive() {
- Socket socket = null;
- try {
- socket = new Socket(host, port);
- BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- String userInput;
- while ((userInput = in.readLine()) != null) {
- store(userInput);
+ private void receive() {
+ try {
+ Socket socket = new Socket(host, port);
+ BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ String userInput;
+ while ((userInput = in.readLine()) != null) {
+ store(userInput);
+ }
+ in.close();
+ socket.close();
+ } catch(ConnectException ce) {
+ ce.printStackTrace();
+ restart("Could not connect", ce);
+ } catch(Throwable t) {
+ t.printStackTrace();
+ restart("Error receiving data", t);
}
- in.close();
- socket.close();
- } catch(ConnectException ce) {
- ce.printStackTrace();
- restart("Could not connect", ce);
- } catch(Throwable t) {
- t.printStackTrace();
- restart("Error receiving data", t);
}
}
-}
+}