diff options
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 752 | ||||
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java | 86 |
2 files changed, 413 insertions, 425 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index e0718f73aa..c521714922 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -18,24 +18,22 @@ package org.apache.spark.streaming; import java.io.*; -import java.lang.Iterable; import java.nio.charset.Charset; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; +import scala.Tuple2; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import scala.Tuple2; - import org.junit.Assert; -import static org.junit.Assert.*; import org.junit.Test; import com.google.common.base.Optional; -import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.common.collect.Sets; @@ -54,14 +52,14 @@ import org.apache.spark.SparkConf; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { - public void equalIterator(Iterator<?> a, Iterator<?> b) { + public static void equalIterator(Iterator<?> a, Iterator<?> b) { while (a.hasNext() && b.hasNext()) { Assert.assertEquals(a.next(), b.next()); } Assert.assertEquals(a.hasNext(), b.hasNext()); } - public void equalIterable(Iterable<?> a, Iterable<?> b) { + public static void equalIterable(Iterable<?> a, Iterable<?> b) { equalIterator(a.iterator(), b.iterator()); } @@ -74,14 +72,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testContextState() { List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4)); - Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaTestUtils.attachTestOutputStream(stream); - Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); ssc.start(); - Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE); + Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState()); ssc.stop(); - Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED); + Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState()); } @SuppressWarnings("unchecked") @@ -118,7 +116,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override - public Integer call(String s) throws Exception { + public Integer call(String s) { return s.length(); } }); @@ -180,7 +178,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<String>> expected = Arrays.asList( Arrays.asList("giants"), @@ -189,7 +187,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() { @Override - public Boolean call(String s) throws Exception { + public Boolean call(String s) { return s.contains("a"); } }); @@ -243,11 +241,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testGlom() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<List<String>>> expected = Arrays.asList( Arrays.asList(Arrays.asList("giants", "dodgers")), - Arrays.asList(Arrays.asList("yankees", "red socks"))); + Arrays.asList(Arrays.asList("yankees", "red sox"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<List<String>> glommed = stream.glom(); @@ -262,22 +260,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testMapPartitions() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<String>> expected = Arrays.asList( Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOCKS")); + Arrays.asList("YANKEESRED SOX")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<String> mapped = stream.mapPartitions( new FlatMapFunction<Iterator<String>, String>() { @Override public Iterable<String> call(Iterator<String> in) { - String out = ""; + StringBuilder out = new StringBuilder(); while (in.hasNext()) { - out = out + in.next().toUpperCase(); + out.append(in.next().toUpperCase(Locale.ENGLISH)); } - return Lists.newArrayList(out); + return Arrays.asList(out.toString()); } }); JavaTestUtils.attachTestOutputStream(mapped); @@ -286,16 +284,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } - private class IntegerSum implements Function2<Integer, Integer, Integer> { + private static class IntegerSum implements Function2<Integer, Integer, Integer> { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } } - private class IntegerDifference implements Function2<Integer, Integer, Integer> { + private static class IntegerDifference implements Function2<Integer, Integer, Integer> { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 - i2; } } @@ -347,13 +345,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(24)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reducedWindowed = null; + JavaDStream<Integer> reducedWindowed; if (withInverse) { reducedWindowed = stream.reduceByWindow(new IntegerSum(), - new IntegerDifference(), new Duration(2000), new Duration(1000)); + new IntegerDifference(), new Duration(2000), new Duration(1000)); } else { reducedWindowed = stream.reduceByWindow(new IntegerSum(), - new Duration(2000), new Duration(1000)); + new Duration(2000), new Duration(1000)); } JavaTestUtils.attachTestOutputStream(reducedWindowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -378,11 +376,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(7,8,9)); JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); - JavaRDD<Integer> rdd1 = ssc.sparkContext().parallelize(Arrays.asList(1, 2, 3)); - JavaRDD<Integer> rdd2 = ssc.sparkContext().parallelize(Arrays.asList(4, 5, 6)); - JavaRDD<Integer> rdd3 = ssc.sparkContext().parallelize(Arrays.asList(7,8,9)); + JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3)); + JavaRDD<Integer> rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6)); + JavaRDD<Integer> rdd3 = jsc.parallelize(Arrays.asList(7,8,9)); - LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList(); + Queue<JavaRDD<Integer>> rdds = new LinkedList<>(); rdds.add(rdd1); rdds.add(rdd2); rdds.add(rdd3); @@ -410,10 +408,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> transformed = stream.transform( new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + public JavaRDD<Integer> call(JavaRDD<Integer> in) { return in.map(new Function<Integer, Integer>() { @Override - public Integer call(Integer i) throws Exception { + public Integer call(Integer i) { return i + 2; } }); @@ -435,70 +433,70 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); List<List<Tuple2<String, Integer>>> pairInputData = - Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - JavaDStream<Integer> transformed1 = stream.transform( + stream.transform( new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + public JavaRDD<Integer> call(JavaRDD<Integer> in) { return null; } } ); - JavaDStream<Integer> transformed2 = stream.transform( + stream.transform( new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) { return null; } } ); - JavaPairDStream<String, Integer> transformed3 = stream.transformToPair( + stream.transformToPair( new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { - @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) { return null; } } ); - JavaPairDStream<String, Integer> transformed4 = stream.transformToPair( + stream.transformToPair( new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { - @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) { return null; } } ); - JavaDStream<Integer> pairTransformed1 = pairStream.transform( + pairStream.transform( new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) { return null; } } ); - JavaDStream<Integer> pairTransformed2 = pairStream.transform( + pairStream.transform( new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) { return null; } } ); - JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair( + pairStream.transformToPair( new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { - @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) { return null; } } ); - JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair( + pairStream.transformToPair( new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { - @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) { return null; } } @@ -511,32 +509,32 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testTransformWith() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( Arrays.asList( - new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), + new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), Arrays.asList( - new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("new york", "rangers"))); + new Tuple2<>("california", "sharks"), + new Tuple2<>("new york", "rangers"))); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( Arrays.asList( - new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "mets")), + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "mets")), Arrays.asList( - new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "islanders"))); + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "islanders"))); List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( Sets.newHashSet( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("dodgers", "giants")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("yankees", "mets"))), + new Tuple2<>("california", + new Tuple2<>("dodgers", "giants")), + new Tuple2<>("new york", + new Tuple2<>("yankees", "mets"))), Sets.newHashSet( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("sharks", "ducks")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("rangers", "islanders")))); + new Tuple2<>("california", + new Tuple2<>("sharks", "ducks")), + new Tuple2<>("new york", + new Tuple2<>("rangers", "islanders")))); JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( ssc, stringStringKVStream1, 1); @@ -552,14 +550,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, - JavaPairRDD<String, Tuple2<String, String>> - >() { + JavaPairRDD<String, Tuple2<String, String>>>() { @Override public JavaPairRDD<String, Tuple2<String, String>> call( JavaPairRDD<String, String> rdd1, JavaPairRDD<String, String> rdd2, - Time time - ) throws Exception { + Time time) { return rdd1.join(rdd2); } } @@ -567,9 +563,9 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestOutputStream(joined); List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); + List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>(); for (List<Tuple2<String, Tuple2<String, String>>> res: result) { - unorderedResult.add(Sets.newHashSet(res)); + unorderedResult.add(Sets.newHashSet(res)); } Assert.assertEquals(expected, unorderedResult); @@ -587,89 +583,89 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); List<List<Tuple2<String, Integer>>> pairInputData1 = - Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); List<List<Tuple2<Double, Character>>> pairInputData2 = - Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x'))); + Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - JavaDStream<Double> transformed1 = stream1.transformWith( + stream1.transformWith( stream2, new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { @Override - public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) { return null; } } ); - JavaDStream<Double> transformed2 = stream1.transformWith( + stream1.transformWith( pairStream1, new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { @Override - public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) { return null; } } ); - JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair( + stream1.transformWithToPair( stream2, new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override - public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) { return null; } } ); - JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair( + stream1.transformWithToPair( pairStream1, new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() { @Override - public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) { return null; } } ); - JavaDStream<Double> pairTransformed1 = pairStream1.transformWith( + pairStream1.transformWith( stream2, new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { @Override - public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) { return null; } } ); - JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith( + pairStream1.transformWith( pairStream1, new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { @Override - public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) { return null; } } ); - JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair( + pairStream1.transformWithToPair( stream2, new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override - public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) { return null; } } ); - JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair( + pairStream1.transformWithToPair( pairStream2, new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() { @Override - public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception { + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) { return null; } } @@ -690,13 +686,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ); List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( - Arrays.asList(new Tuple2<Integer, String>(1, "x")), - Arrays.asList(new Tuple2<Integer, String>(2, "y")) + Arrays.asList(new Tuple2<>(1, "x")), + Arrays.asList(new Tuple2<>(2, "y")) ); List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))), - Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y"))) + Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), + Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) ); JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); @@ -707,7 +703,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); // This is just to test whether this transform to JavaStream compiles - JavaDStream<Long> transformed1 = ssc.transform( + ssc.transform( listOfDStreams1, new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() { @Override @@ -733,8 +729,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() { @Override - public Tuple2<Integer, Integer> call(Integer i) throws Exception { - return new Tuple2<Integer, Integer>(i, i); + public Tuple2<Integer, Integer> call(Integer i) { + return new Tuple2<>(i, i); } }; return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); @@ -763,7 +759,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { - return Lists.newArrayList(x.split("(?!^)")); + return Arrays.asList(x.split("(?!^)")); } }); JavaTestUtils.attachTestOutputStream(flatMapped); @@ -782,39 +778,39 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(6, "g"), - new Tuple2<Integer, String>(6, "i"), - new Tuple2<Integer, String>(6, "a"), - new Tuple2<Integer, String>(6, "n"), - new Tuple2<Integer, String>(6, "t"), - new Tuple2<Integer, String>(6, "s")), + new Tuple2<>(6, "g"), + new Tuple2<>(6, "i"), + new Tuple2<>(6, "a"), + new Tuple2<>(6, "n"), + new Tuple2<>(6, "t"), + new Tuple2<>(6, "s")), Arrays.asList( - new Tuple2<Integer, String>(7, "d"), - new Tuple2<Integer, String>(7, "o"), - new Tuple2<Integer, String>(7, "d"), - new Tuple2<Integer, String>(7, "g"), - new Tuple2<Integer, String>(7, "e"), - new Tuple2<Integer, String>(7, "r"), - new Tuple2<Integer, String>(7, "s")), + new Tuple2<>(7, "d"), + new Tuple2<>(7, "o"), + new Tuple2<>(7, "d"), + new Tuple2<>(7, "g"), + new Tuple2<>(7, "e"), + new Tuple2<>(7, "r"), + new Tuple2<>(7, "s")), Arrays.asList( - new Tuple2<Integer, String>(9, "a"), - new Tuple2<Integer, String>(9, "t"), - new Tuple2<Integer, String>(9, "h"), - new Tuple2<Integer, String>(9, "l"), - new Tuple2<Integer, String>(9, "e"), - new Tuple2<Integer, String>(9, "t"), - new Tuple2<Integer, String>(9, "i"), - new Tuple2<Integer, String>(9, "c"), - new Tuple2<Integer, String>(9, "s"))); + new Tuple2<>(9, "a"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "h"), + new Tuple2<>(9, "l"), + new Tuple2<>(9, "e"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "i"), + new Tuple2<>(9, "c"), + new Tuple2<>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair( new PairFlatMapFunction<String, Integer, String>() { @Override - public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); + public Iterable<Tuple2<Integer, String>> call(String in) { + List<Tuple2<Integer, String>> out = new ArrayList<>(); for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<Integer, String>(in.length(), letter)); + out.add(new Tuple2<>(in.length(), letter)); } return out; } @@ -859,13 +855,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa */ public static <T> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { - List<Set<T>> expectedSets = new ArrayList<Set<T>>(); + List<Set<T>> expectedSets = new ArrayList<>(); for (List<T> list: expected) { - expectedSets.add(Collections.unmodifiableSet(new HashSet<T>(list))); + expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list))); } - List<Set<T>> actualSets = new ArrayList<Set<T>>(); + List<Set<T>> actualSets = new ArrayList<>(); for (List<T> list: actual) { - actualSets.add(Collections.unmodifiableSet(new HashSet<T>(list))); + actualSets.add(Collections.unmodifiableSet(new HashSet<>(list))); } Assert.assertEquals(expectedSets, actualSets); } @@ -877,25 +873,25 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testPairFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); + Arrays.asList("yankees", "red sox")); List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("giants", 6)), - Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); + Arrays.asList(new Tuple2<>("giants", 6)), + Arrays.asList(new Tuple2<>("yankees", 7))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = stream.mapToPair( new PairFunction<String, String, Integer>() { @Override - public Tuple2<String, Integer> call(String in) throws Exception { - return new Tuple2<String, Integer>(in, in.length()); + public Tuple2<String, Integer> call(String in) { + return new Tuple2<>(in, in.length()); } }); JavaPairDStream<String, Integer> filtered = pairStream.filter( new Function<Tuple2<String, Integer>, Boolean>() { @Override - public Boolean call(Tuple2<String, Integer> in) throws Exception { + public Boolean call(Tuple2<String, Integer> in) { return in._1().contains("a"); } }); @@ -906,28 +902,28 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } @SuppressWarnings("unchecked") - private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "yankees"), - new Tuple2<String, String>("new york", "mets")), - Arrays.asList(new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "rangers"), - new Tuple2<String, String>("new york", "islanders"))); + private final List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "yankees"), + new Tuple2<>("new york", "mets")), + Arrays.asList(new Tuple2<>("california", "sharks"), + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "rangers"), + new Tuple2<>("new york", "islanders"))); @SuppressWarnings("unchecked") - private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( + private final List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 1), - new Tuple2<String, Integer>("california", 3), - new Tuple2<String, Integer>("new york", 4), - new Tuple2<String, Integer>("new york", 1)), + new Tuple2<>("california", 1), + new Tuple2<>("california", 3), + new Tuple2<>("new york", 4), + new Tuple2<>("new york", 1)), Arrays.asList( - new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("new york", 3), - new Tuple2<String, Integer>("new york", 1))); + new Tuple2<>("california", 5), + new Tuple2<>("california", 5), + new Tuple2<>("new york", 3), + new Tuple2<>("new york", 1))); @SuppressWarnings("unchecked") @Test @@ -936,22 +932,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "california"), - new Tuple2<Integer, String>(3, "california"), - new Tuple2<Integer, String>(4, "new york"), - new Tuple2<Integer, String>(1, "new york")), + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), Arrays.asList( - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(3, "new york"), - new Tuple2<Integer, String>(1, "new york"))); + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<Integer, String> reversed = pairStream.mapToPair( new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override - public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { + public Tuple2<Integer, String> call(Tuple2<String, Integer> in) { return in.swap(); } }); @@ -969,23 +965,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "california"), - new Tuple2<Integer, String>(3, "california"), - new Tuple2<Integer, String>(4, "new york"), - new Tuple2<Integer, String>(1, "new york")), + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), Arrays.asList( - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(5, "california"), - new Tuple2<Integer, String>(3, "new york"), - new Tuple2<Integer, String>(1, "new york"))); + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair( new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { @Override - public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception { - LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) { + List<Tuple2<Integer, String>> out = new LinkedList<>(); while (in.hasNext()) { Tuple2<String, Integer> next = in.next(); out.add(next.swap()); @@ -1014,7 +1010,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> reversed = pairStream.map( new Function<Tuple2<String, Integer>, Integer>() { @Override - public Integer call(Tuple2<String, Integer> in) throws Exception { + public Integer call(Tuple2<String, Integer> in) { return in._2(); } }); @@ -1030,23 +1026,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("hi", 1), - new Tuple2<String, Integer>("ho", 2)), + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2)), Arrays.asList( - new Tuple2<String, Integer>("hi", 1), - new Tuple2<String, Integer>("ho", 2))); + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2))); List<List<Tuple2<Integer, String>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, String>(1, "h"), - new Tuple2<Integer, String>(1, "i"), - new Tuple2<Integer, String>(2, "h"), - new Tuple2<Integer, String>(2, "o")), + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o")), Arrays.asList( - new Tuple2<Integer, String>(1, "h"), - new Tuple2<Integer, String>(1, "i"), - new Tuple2<Integer, String>(2, "h"), - new Tuple2<Integer, String>(2, "o"))); + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o"))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -1054,10 +1050,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair( new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { @Override - public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception { - List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) { + List<Tuple2<Integer, String>> out = new LinkedList<>(); for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<Integer, String>(in._2(), s.toString())); + out.add(new Tuple2<>(in._2(), s.toString())); } return out; } @@ -1075,11 +1071,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), + new Tuple2<>("california", Arrays.asList("dodgers", "giants")), + new Tuple2<>("new york", Arrays.asList("yankees", "mets"))), Arrays.asList( - new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + new Tuple2<>("california", Arrays.asList("sharks", "ducks")), + new Tuple2<>("new york", Arrays.asList("rangers", "islanders")))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -1111,11 +1107,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), Arrays.asList( - new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1136,20 +1132,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), Arrays.asList( - new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( + JavaPairDStream<String, Integer> combined = pairStream.combineByKey( new Function<Integer, Integer>() { @Override - public Integer call(Integer i) throws Exception { + public Integer call(Integer i) { return i; } }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); @@ -1170,13 +1166,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Long>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("world", 1L)), + new Tuple2<>("hello", 1L), + new Tuple2<>("world", 1L)), Arrays.asList( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("moon", 1L)), + new Tuple2<>("hello", 1L), + new Tuple2<>("moon", 1L)), Arrays.asList( - new Tuple2<String, Long>("hello", 1L))); + new Tuple2<>("hello", 1L))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Long> counted = stream.countByValue(); @@ -1193,16 +1189,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)), - new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4)) + new Tuple2<>("california", Arrays.asList(1, 3)), + new Tuple2<>("new york", Arrays.asList(1, 4)) ), Arrays.asList( - new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)), - new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4)) + new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)), + new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4)) ), Arrays.asList( - new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)), - new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3)) + new Tuple2<>("california", Arrays.asList(5, 5)), + new Tuple2<>("new york", Arrays.asList(1, 3)) ) ); @@ -1220,16 +1216,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } } - private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) { - List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>(); + private static Set<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) { + List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>(); for (Tuple2<String, List<Integer>> tuple: listOfTuples) { newListOfTuples.add(convert(tuple)); } - return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples); + return new HashSet<>(newListOfTuples); } - private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) { - return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2())); + private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) { + return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2())); } @SuppressWarnings("unchecked") @@ -1238,12 +1234,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -1262,12 +1258,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -1278,10 +1274,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { int out = 0; if (state.isPresent()) { - out = out + state.get(); + out += state.get(); } for (Integer v : values) { - out = out + v; + out += v; } return Optional.of(out); } @@ -1298,19 +1294,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<Tuple2<String, Integer>> initial = Arrays.asList ( - new Tuple2<String, Integer> ("california", 1), - new Tuple2<String, Integer> ("new york", 2)); + new Tuple2<>("california", 1), + new Tuple2<>("new york", 2)); JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial); JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD (tmpRDD); List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 5), - new Tuple2<String, Integer>("new york", 7)), - Arrays.asList(new Tuple2<String, Integer>("california", 15), - new Tuple2<String, Integer>("new york", 11)), - Arrays.asList(new Tuple2<String, Integer>("california", 15), - new Tuple2<String, Integer>("new york", 11))); + Arrays.asList(new Tuple2<>("california", 5), + new Tuple2<>("new york", 7)), + Arrays.asList(new Tuple2<>("california", 15), + new Tuple2<>("new york", 11)), + Arrays.asList(new Tuple2<>("california", 15), + new Tuple2<>("new york", 11))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -1321,10 +1317,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { int out = 0; if (state.isPresent()) { - out = out + state.get(); + out += state.get(); } for (Integer v : values) { - out = out + v; + out += v; } return Optional.of(out); } @@ -1341,19 +1337,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, Integer>("california", 4), - new Tuple2<String, Integer>("new york", 5)), - Arrays.asList(new Tuple2<String, Integer>("california", 14), - new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> reduceWindowed = pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), - new Duration(2000), new Duration(1000)); + new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1370,15 +1366,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList( Sets.newHashSet( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("world", 1L)), + new Tuple2<>("hello", 1L), + new Tuple2<>("world", 1L)), Sets.newHashSet( - new Tuple2<String, Long>("hello", 2L), - new Tuple2<String, Long>("world", 1L), - new Tuple2<String, Long>("moon", 1L)), + new Tuple2<>("hello", 2L), + new Tuple2<>("world", 1L), + new Tuple2<>("moon", 1L)), Sets.newHashSet( - new Tuple2<String, Long>("hello", 2L), - new Tuple2<String, Long>("moon", 1L))); + new Tuple2<>("hello", 2L), + new Tuple2<>("moon", 1L))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1386,7 +1382,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa stream.countByValueAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList(); + List<Set<Tuple2<String, Long>>> unorderedResult = new ArrayList<>(); for (List<Tuple2<String, Long>> res: result) { unorderedResult.add(Sets.newHashSet(res)); } @@ -1399,27 +1395,27 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testPairTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(2, 5)), + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(1, 5))); + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5)), + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5))); + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5))); JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1428,7 +1424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair( new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() { @Override - public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { + public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) { return in.sortByKey(); } }); @@ -1444,15 +1440,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testPairToNormalRDDTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( Arrays.asList( - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(1, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(2, 5)), + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), Arrays.asList( - new Tuple2<Integer, Integer>(2, 5), - new Tuple2<Integer, Integer>(3, 5), - new Tuple2<Integer, Integer>(4, 5), - new Tuple2<Integer, Integer>(1, 5))); + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); List<List<Integer>> expected = Arrays.asList( Arrays.asList(3,1,4,2), @@ -1465,11 +1461,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> firstParts = pairStream.transform( new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() { @Override - public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { + public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) { return in.map(new Function<Tuple2<Integer, Integer>, Integer>() { @Override - public Integer call(Tuple2<Integer, Integer> in) { - return in._1(); + public Integer call(Tuple2<Integer, Integer> in2) { + return in2._1(); } }); } @@ -1487,14 +1483,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, String>>> inputData = stringStringKVStream; List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "DODGERS"), - new Tuple2<String, String>("california", "GIANTS"), - new Tuple2<String, String>("new york", "YANKEES"), - new Tuple2<String, String>("new york", "METS")), - Arrays.asList(new Tuple2<String, String>("california", "SHARKS"), - new Tuple2<String, String>("california", "DUCKS"), - new Tuple2<String, String>("new york", "RANGERS"), - new Tuple2<String, String>("new york", "ISLANDERS"))); + Arrays.asList(new Tuple2<>("california", "DODGERS"), + new Tuple2<>("california", "GIANTS"), + new Tuple2<>("new york", "YANKEES"), + new Tuple2<>("new york", "METS")), + Arrays.asList(new Tuple2<>("california", "SHARKS"), + new Tuple2<>("california", "DUCKS"), + new Tuple2<>("new york", "RANGERS"), + new Tuple2<>("new york", "ISLANDERS"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1502,8 +1498,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() { @Override - public String call(String s) throws Exception { - return s.toUpperCase(); + public String call(String s) { + return s.toUpperCase(Locale.ENGLISH); } }); @@ -1519,22 +1515,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<List<Tuple2<String, String>>> inputData = stringStringKVStream; List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers1"), - new Tuple2<String, String>("california", "dodgers2"), - new Tuple2<String, String>("california", "giants1"), - new Tuple2<String, String>("california", "giants2"), - new Tuple2<String, String>("new york", "yankees1"), - new Tuple2<String, String>("new york", "yankees2"), - new Tuple2<String, String>("new york", "mets1"), - new Tuple2<String, String>("new york", "mets2")), - Arrays.asList(new Tuple2<String, String>("california", "sharks1"), - new Tuple2<String, String>("california", "sharks2"), - new Tuple2<String, String>("california", "ducks1"), - new Tuple2<String, String>("california", "ducks2"), - new Tuple2<String, String>("new york", "rangers1"), - new Tuple2<String, String>("new york", "rangers2"), - new Tuple2<String, String>("new york", "islanders1"), - new Tuple2<String, String>("new york", "islanders2"))); + Arrays.asList(new Tuple2<>("california", "dodgers1"), + new Tuple2<>("california", "dodgers2"), + new Tuple2<>("california", "giants1"), + new Tuple2<>("california", "giants2"), + new Tuple2<>("new york", "yankees1"), + new Tuple2<>("new york", "yankees2"), + new Tuple2<>("new york", "mets1"), + new Tuple2<>("new york", "mets2")), + Arrays.asList(new Tuple2<>("california", "sharks1"), + new Tuple2<>("california", "sharks2"), + new Tuple2<>("california", "ducks1"), + new Tuple2<>("california", "ducks2"), + new Tuple2<>("new york", "rangers1"), + new Tuple2<>("new york", "rangers2"), + new Tuple2<>("new york", "islanders1"), + new Tuple2<>("new york", "islanders2"))); JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); @@ -1545,7 +1541,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Function<String, Iterable<String>>() { @Override public Iterable<String> call(String in) { - List<String> out = new ArrayList<String>(); + List<String> out = new ArrayList<>(); out.add(in + "1"); out.add(in + "2"); return out; @@ -1562,29 +1558,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testCoGroup() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), - Arrays.asList(new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("new york", "rangers"))); + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), + Arrays.asList(new Tuple2<>("california", "sharks"), + new Tuple2<>("new york", "rangers"))); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "mets")), - Arrays.asList(new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "islanders"))); + Arrays.asList(new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "mets")), + Arrays.asList(new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "islanders"))); List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Tuple2<List<String>, List<String>>>("california", - new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))), - new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", - new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))), + new Tuple2<>("california", + new Tuple2<>(Arrays.asList("dodgers"), Arrays.asList("giants"))), + new Tuple2<>("new york", + new Tuple2<>(Arrays.asList("yankees"), Arrays.asList("mets")))), Arrays.asList( - new Tuple2<String, Tuple2<List<String>, List<String>>>("california", - new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))), - new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", - new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); + new Tuple2<>("california", + new Tuple2<>(Arrays.asList("sharks"), Arrays.asList("ducks"))), + new Tuple2<>("new york", + new Tuple2<>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( @@ -1620,29 +1616,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testJoin() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), - Arrays.asList(new Tuple2<String, String>("california", "sharks"), - new Tuple2<String, String>("new york", "rangers"))); + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), + Arrays.asList(new Tuple2<>("california", "sharks"), + new Tuple2<>("new york", "rangers"))); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "giants"), - new Tuple2<String, String>("new york", "mets")), - Arrays.asList(new Tuple2<String, String>("california", "ducks"), - new Tuple2<String, String>("new york", "islanders"))); + Arrays.asList(new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "mets")), + Arrays.asList(new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "islanders"))); List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("dodgers", "giants")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("yankees", "mets"))), + new Tuple2<>("california", + new Tuple2<>("dodgers", "giants")), + new Tuple2<>("new york", + new Tuple2<>("yankees", "mets"))), Arrays.asList( - new Tuple2<String, Tuple2<String, String>>("california", - new Tuple2<String, String>("sharks", "ducks")), - new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("rangers", "islanders")))); + new Tuple2<>("california", + new Tuple2<>("sharks", "ducks")), + new Tuple2<>("new york", + new Tuple2<>("rangers", "islanders")))); JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( @@ -1664,13 +1660,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testLeftOuterJoin() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "dodgers"), - new Tuple2<String, String>("new york", "yankees")), - Arrays.asList(new Tuple2<String, String>("california", "sharks") )); + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), + Arrays.asList(new Tuple2<>("california", "sharks") )); List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2<String, String>("california", "giants") ), - Arrays.asList(new Tuple2<String, String>("new york", "islanders") ) + Arrays.asList(new Tuple2<>("california", "giants") ), + Arrays.asList(new Tuple2<>("new york", "islanders") ) ); @@ -1713,7 +1709,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override - public Integer call(String s) throws Exception { + public Integer call(String s) { return s.length(); } }); @@ -1752,6 +1748,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // (used to detect the new context) final AtomicBoolean newContextCreated = new AtomicBoolean(false); Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() { + @Override public JavaStreamingContext call() { newContextCreated.set(true); return new JavaStreamingContext(conf, Seconds.apply(1)); @@ -1765,20 +1762,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa newContextCreated.set(false); ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration(), true); + new Configuration(), true); Assert.assertTrue("new context not created", newContextCreated.get()); ssc.stop(); newContextCreated.set(false); ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration()); + new Configuration()); Assert.assertTrue("old context not recovered", !newContextCreated.get()); ssc.stop(); newContextCreated.set(false); JavaSparkContext sc = new JavaSparkContext(conf); ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration()); + new Configuration()); Assert.assertTrue("old context not recovered", !newContextCreated.get()); ssc.stop(); } @@ -1800,7 +1797,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function<String, Integer>() { @Override - public Integer call(String s) throws Exception { + public Integer call(String s) { return s.length(); } }); @@ -1818,29 +1815,26 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // InputStream functionality is deferred to the existing Scala tests. @Test public void testSocketTextStream() { - JavaReceiverInputDStream<String> test = ssc.socketTextStream("localhost", 12345); + ssc.socketTextStream("localhost", 12345); } @Test public void testSocketString() { - - class Converter implements Function<InputStream, Iterable<String>> { - public Iterable<String> call(InputStream in) throws IOException { - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - List<String> out = new ArrayList<String>(); - while (true) { - String line = reader.readLine(); - if (line == null) { break; } - out.add(line); - } - return out; - } - } - - JavaDStream<String> test = ssc.socketStream( + ssc.socketStream( "localhost", 12345, - new Converter(), + new Function<InputStream, Iterable<String>>() { + @Override + public Iterable<String> call(InputStream in) throws IOException { + List<String> out = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + for (String line; (line = reader.readLine()) != null;) { + out.add(line); + } + } + return out; + } + }, StorageLevel.MEMORY_ONLY()); } @@ -1870,7 +1864,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa TextInputFormat.class, new Function<Path, Boolean>() { @Override - public Boolean call(Path v1) throws Exception { + public Boolean call(Path v1) { return Boolean.TRUE; } }, @@ -1879,7 +1873,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> test = inputStream.map( new Function<Tuple2<LongWritable, Text>, String>() { @Override - public String call(Tuple2<LongWritable, Text> v1) throws Exception { + public String call(Tuple2<LongWritable, Text> v1) { return v1._2().toString(); } }); @@ -1892,19 +1886,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testRawSocketStream() { - JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345); + ssc.rawSocketStream("localhost", 12345); } - private List<List<String>> fileTestPrepare(File testDir) throws IOException { + private static List<List<String>> fileTestPrepare(File testDir) throws IOException { File existingFile = new File(testDir, "0"); Files.write("0\n", existingFile, Charset.forName("UTF-8")); - assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("0") - ); - - return expected; + Assert.assertTrue(existingFile.setLastModified(1000)); + Assert.assertEquals(1000, existingFile.lastModified()); + return Arrays.asList(Arrays.asList("0")); } @SuppressWarnings("unchecked") diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 1b0787fe69..ec2bffd6a5 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -36,7 +36,6 @@ import java.io.InputStreamReader; import java.io.Serializable; import java.net.ConnectException; import java.net.Socket; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class JavaReceiverAPISuite implements Serializable { @@ -64,16 +63,16 @@ public class JavaReceiverAPISuite implements Serializable { ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); JavaDStream<String> mapped = input.map(new Function<String, String>() { @Override - public String call(String v1) throws Exception { + public String call(String v1) { return v1 + "."; } }); mapped.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override - public Void call(JavaRDD<String> rdd) throws Exception { - long count = rdd.count(); - dataCounter.addAndGet(count); - return null; + public Void call(JavaRDD<String> rdd) { + long count = rdd.count(); + dataCounter.addAndGet(count); + return null; } }); @@ -83,7 +82,7 @@ public class JavaReceiverAPISuite implements Serializable { Thread.sleep(200); for (int i = 0; i < 6; i++) { - server.send("" + i + "\n"); // \n to make sure these are separate lines + server.send(i + "\n"); // \n to make sure these are separate lines Thread.sleep(100); } while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) { @@ -95,50 +94,49 @@ public class JavaReceiverAPISuite implements Serializable { server.stop(); } } -} -class JavaSocketReceiver extends Receiver<String> { + private static class JavaSocketReceiver extends Receiver<String> { - String host = null; - int port = -1; + String host = null; + int port = -1; - public JavaSocketReceiver(String host_ , int port_) { - super(StorageLevel.MEMORY_AND_DISK()); - host = host_; - port = port_; - } + JavaSocketReceiver(String host_ , int port_) { + super(StorageLevel.MEMORY_AND_DISK()); + host = host_; + port = port_; + } - @Override - public void onStart() { - new Thread() { - @Override public void run() { - receive(); - } - }.start(); - } + @Override + public void onStart() { + new Thread() { + @Override public void run() { + receive(); + } + }.start(); + } - @Override - public void onStop() { - } + @Override + public void onStop() { + } - private void receive() { - Socket socket = null; - try { - socket = new Socket(host, port); - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); - String userInput; - while ((userInput = in.readLine()) != null) { - store(userInput); + private void receive() { + try { + Socket socket = new Socket(host, port); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + String userInput; + while ((userInput = in.readLine()) != null) { + store(userInput); + } + in.close(); + socket.close(); + } catch(ConnectException ce) { + ce.printStackTrace(); + restart("Could not connect", ce); + } catch(Throwable t) { + t.printStackTrace(); + restart("Error receiving data", t); } - in.close(); - socket.close(); - } catch(ConnectException ce) { - ce.printStackTrace(); - restart("Could not connect", ce); - } catch(Throwable t) { - t.printStackTrace(); - restart("Error receiving data", t); } } -} +} |