diff options
author | Sean Owen <sowen@cloudera.com> | 2017-02-19 09:42:50 -0800 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-02-19 09:42:50 -0800 |
commit | 1487c9af20a333ead55955acf4c0aa323bea0d07 (patch) | |
tree | 5f47daa77e0f73da1e009cc3dcf0a5c0073246aa /streaming/src | |
parent | de14d35f77071932963a994fac5aec0e5df838a1 (diff) | |
download | spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.gz spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.bz2 spark-1487c9af20a333ead55955acf4c0aa323bea0d07.zip |
[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features
## What changes were proposed in this pull request?
Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code.
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes #16964 from srowen/SPARK-19534.
Diffstat (limited to 'streaming/src')
5 files changed, 162 insertions, 500 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java index 9b7701003d..cb8ed83e5a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java @@ -27,9 +27,6 @@ import java.util.Set; import scala.Tuple2; import com.google.common.collect.Sets; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.util.ManualClock; import org.junit.Assert; @@ -53,18 +50,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaPairDStream<String, Integer> wordsDstream = null; Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc = - new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() { - @Override - public Optional<Double> call( - Time time, String word, Optional<Integer> one, State<Boolean> state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - } + (time, word, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); }; JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = @@ -78,17 +71,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements stateDstream.stateSnapshots(); Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 = - new Function3<String, Optional<Integer>, State<Boolean>, Double>() { - @Override - public Double call(String key, Optional<Integer> one, State<Boolean> state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - } + (key, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; }; JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = @@ -136,13 +126,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements ); Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc = - new Function3<String, Optional<Integer>, State<Integer>, Integer>() { - @Override - public Integer call(String key, Optional<Integer> value, State<Integer> state) { - int sum = value.orElse(0) + (state.exists() ? state.get() : 0); - state.update(sum); - return sum; - } + (key, value, state) -> { + int sum = value.orElse(0) + (state.exists() ? state.get() : 0); + state.update(sum); + return sum; }; testOperation( inputData, @@ -159,29 +146,15 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements int numBatches = expectedOutputs.size(); JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2); JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream = - JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() { - @Override - public Tuple2<K, Integer> call(K x) { - return new Tuple2<>(x, 1); - } - })).mapWithState(mapWithStateSpec); - - final List<Set<T>> collectedOutputs = + JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec); + + List<Set<T>> collectedOutputs = Collections.synchronizedList(new ArrayList<Set<T>>()); - mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() { - @Override - public void call(JavaRDD<T> rdd) { - collectedOutputs.add(Sets.newHashSet(rdd.collect())); - } - }); - final List<Set<Tuple2<K, S>>> collectedStateSnapshots = + mapWithStateDStream.foreachRDD(rdd -> collectedOutputs.add(Sets.newHashSet(rdd.collect()))); + List<Set<Tuple2<K, S>>> collectedStateSnapshots = Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>()); - mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() { - @Override - public void call(JavaPairRDD<K, S> rdd) { - collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())); - } - }); + mapWithStateDStream.stateSnapshots().foreachRDD(rdd -> + collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()))); BatchCounter batchCounter = new BatchCounter(ssc.ssc()); ssc.start(); ((ManualClock) ssc.ssc().scheduler().clock()) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 091ccbfd85..9156047244 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -58,24 +58,16 @@ public class JavaReceiverAPISuite implements Serializable { TestServer server = new TestServer(0); server.start(); - final AtomicLong dataCounter = new AtomicLong(0); + AtomicLong dataCounter = new AtomicLong(0); try { JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200)); JavaReceiverInputDStream<String> input = ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); - JavaDStream<String> mapped = input.map(new Function<String, String>() { - @Override - public String call(String v1) { - return v1 + "."; - } - }); - mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() { - @Override - public void call(JavaRDD<String> rdd) { - long count = rdd.count(); - dataCounter.addAndGet(count); - } + JavaDStream<String> mapped = input.map((Function<String, String>) v1 -> v1 + "."); + mapped.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> { + long count = rdd.count(); + dataCounter.addAndGet(count); }); ssc.start(); @@ -110,11 +102,7 @@ public class JavaReceiverAPISuite implements Serializable { @Override public void onStart() { - new Thread() { - @Override public void run() { - receive(); - } - }.start(); + new Thread(this::receive).start(); } @Override diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index f02fa87f61..3f4e6ddb21 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import com.google.common.base.Function; import com.google.common.collect.Iterators; import org.apache.spark.SparkConf; import org.apache.spark.network.util.JavaUtils; @@ -81,12 +80,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { @Override public Iterator<ByteBuffer> readAll() { - return Iterators.transform(records.iterator(), new Function<Record,ByteBuffer>() { - @Override - public ByteBuffer apply(Record input) { - return input.buffer; - } - }); + return Iterators.transform(records.iterator(), input -> input.buffer); } @Override @@ -114,7 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { String data1 = "data1"; WriteAheadLogRecordHandle handle = wal.write(JavaUtils.stringToBytes(data1), 1234); Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle); - Assert.assertEquals(JavaUtils.bytesToString(wal.read(handle)), data1); + Assert.assertEquals(data1, JavaUtils.bytesToString(wal.read(handle))); wal.write(JavaUtils.stringToBytes("data2"), 1235); wal.write(JavaUtils.stringToBytes("data3"), 1236); diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java index 646cb97066..9948a4074c 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java @@ -28,7 +28,6 @@ import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.Time; import scala.Tuple2; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; @@ -101,7 +100,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ while (in.hasNext()) { out = out + in.next().toUpperCase(); } - return Lists.newArrayList(out).iterator(); + return Arrays.asList(out).iterator(); }); JavaTestUtils.attachTestOutputStream(mapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -240,7 +239,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaTestUtils.attachTestOutputStream(joined); List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); + List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>(); for (List<Tuple2<String, Tuple2<String, String>>> res : result) { unorderedResult.add(Sets.newHashSet(res)); } @@ -315,7 +314,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); + List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2); // This is just to test whether this transform to JavaStream compiles JavaDStream<Long> transformed1 = ssc.transform( @@ -325,7 +324,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ }); List<JavaDStream<?>> listOfDStreams2 = - Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); + Arrays.asList(stream1, stream2, pairStream1.toJavaDStream()); JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> { @@ -358,7 +357,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<String> flatMapped = stream.flatMap( - s -> Lists.newArrayList(s.split("(?!^)")).iterator()); + s -> Arrays.asList(s.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -401,7 +400,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); + List<Tuple2<Integer, String>> out = new ArrayList<>(); for (String letter : s.split("(?!^)")) { out.add(new Tuple2<>(s.length(), letter)); } @@ -420,7 +419,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ */ public static <T extends Comparable<T>> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { - expected.forEach(list -> Collections.sort(list)); + expected.forEach(Collections::sort); List<List<T>> sortedActual = new ArrayList<>(); actual.forEach(list -> { List<T> sortedList = new ArrayList<>(list); @@ -491,7 +490,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap()); + JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -543,7 +542,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); + JavaDStream<Integer> reversed = pairStream.map(Tuple2::_2); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -629,7 +628,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i, + JavaPairDStream<String, Integer> combined = pairStream.combineByKey(i -> i, (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2)); JavaTestUtils.attachTestOutputStream(combined); diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java index 8d24104d78..b966cbdca0 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -33,7 +33,6 @@ import org.apache.spark.streaming.Time; import scala.Tuple2; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -123,12 +122,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(9,4)); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { - @Override - public Integer call(String s) { - return s.length(); - } - }); + JavaDStream<Integer> letterCount = stream.map(String::length); JavaTestUtils.attachTestOutputStream(letterCount); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -194,12 +188,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("yankees")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() { - @Override - public Boolean call(String s) { - return s.contains("a"); - } - }); + JavaDStream<String> filtered = stream.filter(s -> s.contains("a")); JavaTestUtils.attachTestOutputStream(filtered); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -276,17 +265,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("YANKEESRED SOX")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> mapped = stream.mapPartitions( - new FlatMapFunction<Iterator<String>, String>() { - @Override - public Iterator<String> call(Iterator<String> in) { - StringBuilder out = new StringBuilder(); - while (in.hasNext()) { - out.append(in.next().toUpperCase(Locale.ENGLISH)); - } - return Arrays.asList(out.toString()).iterator(); - } - }); + JavaDStream<String> mapped = stream.mapPartitions(in -> { + StringBuilder out = new StringBuilder(); + while (in.hasNext()) { + out.append(in.next().toUpperCase(Locale.ENGLISH)); + } + return Arrays.asList(out.toString()).iterator(); + }); JavaTestUtils.attachTestOutputStream(mapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -416,18 +401,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(9,10,11)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> transformed = stream.transform( - new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) { - return in.map(new Function<Integer, Integer>() { - @Override - public Integer call(Integer i) { - return i + 2; - } - }); - } - }); + JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2)); JavaTestUtils.attachTestOutputStream(transformed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -448,71 +422,21 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - stream.transform( - new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) { - return null; - } - } - ); + stream.transform(in -> null); - stream.transform( - new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) { - return null; - } - } - ); + stream.transform((in, time) -> null); - stream.transformToPair( - new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { - @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) { - return null; - } - } - ); + stream.transformToPair(in -> null); - stream.transformToPair( - new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { - @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) { - return null; - } - } - ); + stream.transformToPair((in, time) -> null); - pairStream.transform( - new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) { - return null; - } - } - ); + pairStream.transform(in -> null); - pairStream.transform( - new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) { - return null; - } - } - ); + pairStream.transform((in, time) -> null); - pairStream.transformToPair( - new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { - @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) { - return null; - } - } - ); + pairStream.transformToPair(in -> null); - pairStream.transformToPair( - new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { - @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, - Time time) { - return null; - } - } - ); + pairStream.transformToPair((in, time) -> null); } @@ -558,19 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair( pairStream2, - new Function3< - JavaPairRDD<String, String>, - JavaPairRDD<String, String>, - Time, - JavaPairRDD<String, Tuple2<String, String>>>() { - @Override - public JavaPairRDD<String, Tuple2<String, String>> call( - JavaPairRDD<String, String> rdd1, - JavaPairRDD<String, String> rdd2, - Time time) { - return rdd1.join(rdd2); - } - } + (rdd1, rdd2, time) -> rdd1.join(rdd2) ); JavaTestUtils.attachTestOutputStream(joined); @@ -603,100 +515,21 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - stream1.transformWith( - stream2, - new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { - @Override - public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) { - return null; - } - } - ); + stream1.transformWith(stream2, (rdd1, rdd2, time) -> null); - stream1.transformWith( - pairStream1, - new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { - @Override - public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, - Time time) { - return null; - } - } - ); + stream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null); - stream1.transformWithToPair( - stream2, - new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { - @Override - public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, - Time time) { - return null; - } - } - ); + stream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null); - stream1.transformWithToPair( - pairStream1, - new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, - JavaPairRDD<Double, Double>>() { - @Override - public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, - JavaPairRDD<String, Integer> rdd2, - Time time) { - return null; - } - } - ); + stream1.transformWithToPair(pairStream1, (rdd1, rdd2, time) -> null); - pairStream1.transformWith( - stream2, - new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { - @Override - public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWith(stream2, (rdd1, rdd2, time) -> null); - pairStream1.transformWith( - pairStream1, - new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, - JavaRDD<Double>>() { - @Override - public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, - JavaPairRDD<String, Integer> rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null); - pairStream1.transformWithToPair( - stream2, - new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, - JavaPairRDD<Double, Double>>() { - @Override - public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, - JavaRDD<String> rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null); - pairStream1.transformWithToPair( - pairStream2, - new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, - JavaPairRDD<Double, Double>>() { - @Override - public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, - JavaPairRDD<Double, Character> rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWithToPair(pairStream2, (rdd1, rdd2, time) -> null); } @SuppressWarnings("unchecked") @@ -727,44 +560,32 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); + List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2); // This is just to test whether this transform to JavaStream compiles ssc.transform( listOfDStreams1, - new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() { - @Override - public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) { - Assert.assertEquals(2, listOfRDDs.size()); - return null; - } + (listOfRDDs, time) -> { + Assert.assertEquals(2, listOfRDDs.size()); + return null; } ); List<JavaDStream<?>> listOfDStreams2 = - Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); + Arrays.asList(stream1, stream2, pairStream1.toJavaDStream()); JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( listOfDStreams2, - new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() { - @Override - public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, - Time time) { - Assert.assertEquals(3, listOfRDDs.size()); - JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0); - JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1); - JavaRDD<Tuple2<Integer, String>> rdd3 = - (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2); - JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); - PairFunction<Integer, Integer, Integer> mapToTuple = - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i, i); - } - }; - return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); - } + (listOfRDDs, time) -> { + Assert.assertEquals(3, listOfRDDs.size()); + JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0); + JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1); + JavaRDD<Tuple2<Integer, String>> rdd3 = + (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2); + JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); + PairFunction<Integer, Integer, Integer> mapToTuple = + (PairFunction<Integer, Integer, Integer>) i -> new Tuple2<>(i, i); + return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); } ); JavaTestUtils.attachTestOutputStream(transformed2); @@ -787,12 +608,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("a","t","h","l","e","t","i","c","s")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(x.split("(?!^)")).iterator(); - } - }); + JavaDStream<String> flatMapped = stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -811,25 +627,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output - stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() { - @Override - public void call(JavaRDD<Integer> rdd) { - accumRdd.add(1); - rdd.foreach(new VoidFunction<Integer>() { - @Override - public void call(Integer i) { - accumEle.add(1); - } - }); - } + stream.foreachRDD(rdd -> { + accumRdd.add(1); + rdd.foreach(i -> accumEle.add(1)); }); // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() { - @Override - public void call(JavaRDD<Integer> rdd, Time time) { - } - }); + stream.foreachRDD((rdd, time) -> {}); JavaTestUtils.runStreams(ssc, 2, 2); @@ -873,16 +677,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair( - new PairFlatMapFunction<String, Integer, String>() { - @Override - public Iterator<Tuple2<Integer, String>> call(String in) { - List<Tuple2<Integer, String>> out = new ArrayList<>(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<>(in.length(), letter)); - } - return out.iterator(); + JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(in -> { + List<Tuple2<Integer, String>> out = new ArrayList<>(); + for (String letter : in.split("(?!^)")) { + out.add(new Tuple2<>(in.length(), letter)); } + return out.iterator(); }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -949,21 +749,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(new Tuple2<>("yankees", 7))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = stream.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String in) { - return new Tuple2<>(in, in.length()); - } - }); + JavaPairDStream<String, Integer> pairStream = + stream.mapToPair(in -> new Tuple2<>(in, in.length())); - JavaPairDStream<String, Integer> filtered = pairStream.filter( - new Function<Tuple2<String, Integer>, Boolean>() { - @Override - public Boolean call(Tuple2<String, Integer> in) { - return in._1().contains("a"); - } - }); + JavaPairDStream<String, Integer> filtered = pairStream.filter(in -> in._1().contains("a")); JavaTestUtils.attachTestOutputStream(filtered); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1014,13 +803,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapToPair( - new PairFunction<Tuple2<String, Integer>, Integer, String>() { - @Override - public Tuple2<Integer, String> call(Tuple2<String, Integer> in) { - return in.swap(); - } - }); + JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1048,18 +831,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair( - new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { - @Override - public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) { - List<Tuple2<Integer, String>> out = new LinkedList<>(); - while (in.hasNext()) { - Tuple2<String, Integer> next = in.next(); - out.add(next.swap()); - } - return out.iterator(); - } - }); + JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> { + List<Tuple2<Integer, String>> out = new LinkedList<>(); + while (in.hasNext()) { + Tuple2<String, Integer> next = in.next(); + out.add(next.swap()); + } + return out.iterator(); + }); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1079,13 +858,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> reversed = pairStream.map( - new Function<Tuple2<String, Integer>, Integer>() { - @Override - public Integer call(Tuple2<String, Integer> in) { - return in._2(); - } - }); + JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); JavaTestUtils.attachTestOutputStream(reversed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1119,17 +892,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair( - new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { - @Override - public Iterator<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) { - List<Tuple2<Integer, String>> out = new LinkedList<>(); - for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<>(in._2(), s.toString())); - } - return out.iterator(); - } - }); + JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> { + List<Tuple2<Integer, String>> out = new LinkedList<>(); + for (Character s : in._1().toCharArray()) { + out.add(new Tuple2<>(in._2(), s.toString())); + } + return out.iterator(); + }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1216,12 +985,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> combined = pairStream.combineByKey( - new Function<Integer, Integer>() { - @Override - public Integer call(Integer i) { - return i; - } - }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); + i -> i, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); JavaTestUtils.attachTestOutputStream(combined); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1345,20 +1109,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( - new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { - @Override - public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { - int out = 0; - if (state.isPresent()) { - out += state.get(); - } - for (Integer v : values) { - out += v; - } - return Optional.of(out); - } - }); + JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> { + int out = 0; + if (state.isPresent()) { + out += state.get(); + } + for (Integer v : values) { + out += v; + } + return Optional.of(out); + }); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1389,20 +1149,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( - new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { - @Override - public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { - int out = 0; - if (state.isPresent()) { - out += state.get(); - } - for (Integer v : values) { - out += v; - } - return Optional.of(out); - } - }, new HashPartitioner(1), initialRDD); + JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> { + int out = 0; + if (state.isPresent()) { + out += state.get(); + } + for (Integer v : values) { + out += v; + } + return Optional.of(out); + }, new HashPartitioner(1), initialRDD); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1500,13 +1256,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair( - new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() { - @Override - public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) { - return in.sortByKey(); - } - }); + JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey()); JavaTestUtils.attachTestOutputStream(sorted); List<List<Tuple2<Integer, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1537,18 +1287,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> firstParts = pairStream.transform( - new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) { - return in.map(new Function<Tuple2<Integer, Integer>, Integer>() { - @Override - public Integer call(Tuple2<Integer, Integer> in2) { - return in2._1(); - } - }); - } - }); + JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(in2 -> in2._1())); JavaTestUtils.attachTestOutputStream(firstParts); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1575,12 +1314,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() { - @Override - public String call(String s) { - return s.toUpperCase(Locale.ENGLISH); - } - }); + JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH)); JavaTestUtils.attachTestOutputStream(mapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1616,16 +1350,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues( - new Function<String, Iterable<String>>() { - @Override - public Iterable<String> call(String in) { - List<String> out = new ArrayList<>(); - out.add(in + "1"); - out.add(in + "2"); - return out; - } - }); + JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> { + List<String> out = new ArrayList<>(); + out.add(in + "1"); + out.add(in + "2"); + return out; + }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1795,12 +1525,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc.checkpoint(tempDir.getAbsolutePath()); JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { - @Override - public Integer call(String s) { - return s.length(); - } - }); + JavaDStream<Integer> letterCount = stream.map(String::length); JavaCheckpointTestUtils.attachTestOutputStream(letterCount); List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); @@ -1822,7 +1547,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testContextGetOrCreate() throws InterruptedException { ssc.stop(); - final SparkConf conf = new SparkConf() + SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("test") .set("newContext", "true"); @@ -1835,13 +1560,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // Function to create JavaStreamingContext without any output operations // (used to detect the new context) - final AtomicBoolean newContextCreated = new AtomicBoolean(false); - Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() { - @Override - public JavaStreamingContext call() { - newContextCreated.set(true); - return new JavaStreamingContext(conf, Seconds.apply(1)); - } + AtomicBoolean newContextCreated = new AtomicBoolean(false); + Function0<JavaStreamingContext> creatingFunc = () -> { + newContextCreated.set(true); + return new JavaStreamingContext(conf, Seconds.apply(1)); }; newContextCreated.set(false); @@ -1912,18 +1634,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc.socketStream( "localhost", 12345, - new Function<InputStream, Iterable<String>>() { - @Override - public Iterable<String> call(InputStream in) throws IOException { - List<String> out = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(in, StandardCharsets.UTF_8))) { - for (String line; (line = reader.readLine()) != null;) { - out.add(line); - } + in -> { + List<String> out = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(in, StandardCharsets.UTF_8))) { + for (String line; (line = reader.readLine()) != null;) { + out.add(line); } - return out; } + return out; }, StorageLevel.MEMORY_ONLY()); } @@ -1952,21 +1671,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa LongWritable.class, Text.class, TextInputFormat.class, - new Function<Path, Boolean>() { - @Override - public Boolean call(Path v1) { - return Boolean.TRUE; - } - }, + v1 -> Boolean.TRUE, true); - JavaDStream<String> test = inputStream.map( - new Function<Tuple2<LongWritable, Text>, String>() { - @Override - public String call(Tuple2<LongWritable, Text> v1) { - return v1._2().toString(); - } - }); + JavaDStream<String> test = inputStream.map(v1 -> v1._2().toString()); JavaTestUtils.attachTestOutputStream(test); List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1); |