From 1487c9af20a333ead55955acf4c0aa323bea0d07 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 19 Feb 2017 09:42:50 -0800 Subject: [SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features ## What changes were proposed in this pull request? Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code. ## How was this patch tested? Jenkins tests Author: Sean Owen Closes #16964 from srowen/SPARK-19534. --- .../spark/streaming/JavaMapWithStateSuite.java | 81 ++-- .../spark/streaming/JavaReceiverAPISuite.java | 24 +- .../spark/streaming/JavaWriteAheadLogSuite.java | 10 +- .../org/apache/spark/streaming/Java8APISuite.java | 21 +- .../org/apache/spark/streaming/JavaAPISuite.java | 526 +++++---------------- 5 files changed, 162 insertions(+), 500 deletions(-) (limited to 'streaming') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java index 9b7701003d..cb8ed83e5a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java @@ -27,9 +27,6 @@ import java.util.Set; import scala.Tuple2; import com.google.common.collect.Sets; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.util.ManualClock; import org.junit.Assert; @@ -53,18 +50,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaPairDStream wordsDstream = null; Function4, State, Optional> mappingFunc = - new Function4, State, Optional>() { - @Override - public Optional call( - Time time, String word, Optional one, State state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - } + (time, word, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); }; JavaMapWithStateDStream stateDstream = @@ -78,17 +71,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements stateDstream.stateSnapshots(); Function3, State, Double> mappingFunc2 = - new Function3, State, Double>() { - @Override - public Double call(String key, Optional one, State state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - } + (key, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; }; JavaMapWithStateDStream stateDstream2 = @@ -136,13 +126,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements ); Function3, State, Integer> mappingFunc = - new Function3, State, Integer>() { - @Override - public Integer call(String key, Optional value, State state) { - int sum = value.orElse(0) + (state.exists() ? state.get() : 0); - state.update(sum); - return sum; - } + (key, value, state) -> { + int sum = value.orElse(0) + (state.exists() ? state.get() : 0); + state.update(sum); + return sum; }; testOperation( inputData, @@ -159,29 +146,15 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements int numBatches = expectedOutputs.size(); JavaDStream inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2); JavaMapWithStateDStream mapWithStateDStream = - JavaPairDStream.fromJavaDStream(inputStream.map(new Function>() { - @Override - public Tuple2 call(K x) { - return new Tuple2<>(x, 1); - } - })).mapWithState(mapWithStateSpec); - - final List> collectedOutputs = + JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec); + + List> collectedOutputs = Collections.synchronizedList(new ArrayList>()); - mapWithStateDStream.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - collectedOutputs.add(Sets.newHashSet(rdd.collect())); - } - }); - final List>> collectedStateSnapshots = + mapWithStateDStream.foreachRDD(rdd -> collectedOutputs.add(Sets.newHashSet(rdd.collect()))); + List>> collectedStateSnapshots = Collections.synchronizedList(new ArrayList>>()); - mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction>() { - @Override - public void call(JavaPairRDD rdd) { - collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())); - } - }); + mapWithStateDStream.stateSnapshots().foreachRDD(rdd -> + collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()))); BatchCounter batchCounter = new BatchCounter(ssc.ssc()); ssc.start(); ((ManualClock) ssc.ssc().scheduler().clock()) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 091ccbfd85..9156047244 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -58,24 +58,16 @@ public class JavaReceiverAPISuite implements Serializable { TestServer server = new TestServer(0); server.start(); - final AtomicLong dataCounter = new AtomicLong(0); + AtomicLong dataCounter = new AtomicLong(0); try { JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200)); JavaReceiverInputDStream input = ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); - JavaDStream mapped = input.map(new Function() { - @Override - public String call(String v1) { - return v1 + "."; - } - }); - mapped.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - long count = rdd.count(); - dataCounter.addAndGet(count); - } + JavaDStream mapped = input.map((Function) v1 -> v1 + "."); + mapped.foreachRDD((VoidFunction>) rdd -> { + long count = rdd.count(); + dataCounter.addAndGet(count); }); ssc.start(); @@ -110,11 +102,7 @@ public class JavaReceiverAPISuite implements Serializable { @Override public void onStart() { - new Thread() { - @Override public void run() { - receive(); - } - }.start(); + new Thread(this::receive).start(); } @Override diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index f02fa87f61..3f4e6ddb21 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import com.google.common.base.Function; import com.google.common.collect.Iterators; import org.apache.spark.SparkConf; import org.apache.spark.network.util.JavaUtils; @@ -81,12 +80,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { @Override public Iterator readAll() { - return Iterators.transform(records.iterator(), new Function() { - @Override - public ByteBuffer apply(Record input) { - return input.buffer; - } - }); + return Iterators.transform(records.iterator(), input -> input.buffer); } @Override @@ -114,7 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { String data1 = "data1"; WriteAheadLogRecordHandle handle = wal.write(JavaUtils.stringToBytes(data1), 1234); Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle); - Assert.assertEquals(JavaUtils.bytesToString(wal.read(handle)), data1); + Assert.assertEquals(data1, JavaUtils.bytesToString(wal.read(handle))); wal.write(JavaUtils.stringToBytes("data2"), 1235); wal.write(JavaUtils.stringToBytes("data3"), 1236); diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java index 646cb97066..9948a4074c 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java @@ -28,7 +28,6 @@ import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.Time; import scala.Tuple2; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; @@ -101,7 +100,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ while (in.hasNext()) { out = out + in.next().toUpperCase(); } - return Lists.newArrayList(out).iterator(); + return Arrays.asList(out).iterator(); }); JavaTestUtils.attachTestOutputStream(mapped); List> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -240,7 +239,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaTestUtils.attachTestOutputStream(joined); List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List>>> unorderedResult = Lists.newArrayList(); + List>>> unorderedResult = new ArrayList<>(); for (List>> res : result) { unorderedResult.add(Sets.newHashSet(res)); } @@ -315,7 +314,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - List> listOfDStreams1 = Arrays.>asList(stream1, stream2); + List> listOfDStreams1 = Arrays.asList(stream1, stream2); // This is just to test whether this transform to JavaStream compiles JavaDStream transformed1 = ssc.transform( @@ -325,7 +324,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ }); List> listOfDStreams2 = - Arrays.>asList(stream1, stream2, pairStream1.toJavaDStream()); + Arrays.asList(stream1, stream2, pairStream1.toJavaDStream()); JavaPairDStream> transformed2 = ssc.transformToPair( listOfDStreams2, (List> listOfRDDs, Time time) -> { @@ -358,7 +357,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream flatMapped = stream.flatMap( - s -> Lists.newArrayList(s.split("(?!^)")).iterator()); + s -> Arrays.asList(s.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -401,7 +400,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream flatMapped = stream.flatMapToPair(s -> { - List> out = Lists.newArrayList(); + List> out = new ArrayList<>(); for (String letter : s.split("(?!^)")) { out.add(new Tuple2<>(s.length(), letter)); } @@ -420,7 +419,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ */ public static > void assertOrderInvariantEquals( List> expected, List> actual) { - expected.forEach(list -> Collections.sort(list)); + expected.forEach(Collections::sort); List> sortedActual = new ArrayList<>(); actual.forEach(list -> { List sortedList = new ArrayList<>(list); @@ -491,7 +490,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream reversed = pairStream.mapToPair(x -> x.swap()); + JavaPairDStream reversed = pairStream.mapToPair(Tuple2::swap); JavaTestUtils.attachTestOutputStream(reversed); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -543,7 +542,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream reversed = pairStream.map(in -> in._2()); + JavaDStream reversed = pairStream.map(Tuple2::_2); JavaTestUtils.attachTestOutputStream(reversed); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -629,7 +628,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream combined = pairStream.combineByKey(i -> i, + JavaPairDStream combined = pairStream.combineByKey(i -> i, (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2)); JavaTestUtils.attachTestOutputStream(combined); diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java index 8d24104d78..b966cbdca0 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -33,7 +33,6 @@ import org.apache.spark.streaming.Time; import scala.Tuple2; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -123,12 +122,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(9,4)); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) { - return s.length(); - } - }); + JavaDStream letterCount = stream.map(String::length); JavaTestUtils.attachTestOutputStream(letterCount); List> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -194,12 +188,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("yankees")); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream filtered = stream.filter(new Function() { - @Override - public Boolean call(String s) { - return s.contains("a"); - } - }); + JavaDStream filtered = stream.filter(s -> s.contains("a")); JavaTestUtils.attachTestOutputStream(filtered); List> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -276,17 +265,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("YANKEESRED SOX")); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions( - new FlatMapFunction, String>() { - @Override - public Iterator call(Iterator in) { - StringBuilder out = new StringBuilder(); - while (in.hasNext()) { - out.append(in.next().toUpperCase(Locale.ENGLISH)); - } - return Arrays.asList(out.toString()).iterator(); - } - }); + JavaDStream mapped = stream.mapPartitions(in -> { + StringBuilder out = new StringBuilder(); + while (in.hasNext()) { + out.append(in.next().toUpperCase(Locale.ENGLISH)); + } + return Arrays.asList(out.toString()).iterator(); + }); JavaTestUtils.attachTestOutputStream(mapped); List> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -416,18 +401,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(9,10,11)); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream transformed = stream.transform( - new Function, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD in) { - return in.map(new Function() { - @Override - public Integer call(Integer i) { - return i + 2; - } - }); - } - }); + JavaDStream transformed = stream.transform(in -> in.map(i -> i + 2)); JavaTestUtils.attachTestOutputStream(transformed); List> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -448,71 +422,21 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - stream.transform( - new Function, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD in) { - return null; - } - } - ); + stream.transform(in -> null); - stream.transform( - new Function2, Time, JavaRDD>() { - @Override public JavaRDD call(JavaRDD in, Time time) { - return null; - } - } - ); + stream.transform((in, time) -> null); - stream.transformToPair( - new Function, JavaPairRDD>() { - @Override public JavaPairRDD call(JavaRDD in) { - return null; - } - } - ); + stream.transformToPair(in -> null); - stream.transformToPair( - new Function2, Time, JavaPairRDD>() { - @Override public JavaPairRDD call(JavaRDD in, Time time) { - return null; - } - } - ); + stream.transformToPair((in, time) -> null); - pairStream.transform( - new Function, JavaRDD>() { - @Override public JavaRDD call(JavaPairRDD in) { - return null; - } - } - ); + pairStream.transform(in -> null); - pairStream.transform( - new Function2, Time, JavaRDD>() { - @Override public JavaRDD call(JavaPairRDD in, Time time) { - return null; - } - } - ); + pairStream.transform((in, time) -> null); - pairStream.transformToPair( - new Function, JavaPairRDD>() { - @Override public JavaPairRDD call(JavaPairRDD in) { - return null; - } - } - ); + pairStream.transformToPair(in -> null); - pairStream.transformToPair( - new Function2, Time, JavaPairRDD>() { - @Override public JavaPairRDD call(JavaPairRDD in, - Time time) { - return null; - } - } - ); + pairStream.transformToPair((in, time) -> null); } @@ -558,19 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream> joined = pairStream1.transformWithToPair( pairStream2, - new Function3< - JavaPairRDD, - JavaPairRDD, - Time, - JavaPairRDD>>() { - @Override - public JavaPairRDD> call( - JavaPairRDD rdd1, - JavaPairRDD rdd2, - Time time) { - return rdd1.join(rdd2); - } - } + (rdd1, rdd2, time) -> rdd1.join(rdd2) ); JavaTestUtils.attachTestOutputStream(joined); @@ -603,100 +515,21 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - stream1.transformWith( - stream2, - new Function3, JavaRDD, Time, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD rdd1, JavaRDD rdd2, Time time) { - return null; - } - } - ); + stream1.transformWith(stream2, (rdd1, rdd2, time) -> null); - stream1.transformWith( - pairStream1, - new Function3, JavaPairRDD, Time, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD rdd1, JavaPairRDD rdd2, - Time time) { - return null; - } - } - ); + stream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null); - stream1.transformWithToPair( - stream2, - new Function3, JavaRDD, Time, JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaRDD rdd1, JavaRDD rdd2, - Time time) { - return null; - } - } - ); + stream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null); - stream1.transformWithToPair( - pairStream1, - new Function3, JavaPairRDD, Time, - JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaRDD rdd1, - JavaPairRDD rdd2, - Time time) { - return null; - } - } - ); + stream1.transformWithToPair(pairStream1, (rdd1, rdd2, time) -> null); - pairStream1.transformWith( - stream2, - new Function3, JavaRDD, Time, JavaRDD>() { - @Override - public JavaRDD call(JavaPairRDD rdd1, JavaRDD rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWith(stream2, (rdd1, rdd2, time) -> null); - pairStream1.transformWith( - pairStream1, - new Function3, JavaPairRDD, Time, - JavaRDD>() { - @Override - public JavaRDD call(JavaPairRDD rdd1, - JavaPairRDD rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null); - pairStream1.transformWithToPair( - stream2, - new Function3, JavaRDD, Time, - JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaPairRDD rdd1, - JavaRDD rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null); - pairStream1.transformWithToPair( - pairStream2, - new Function3, JavaPairRDD, Time, - JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaPairRDD rdd1, - JavaPairRDD rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWithToPair(pairStream2, (rdd1, rdd2, time) -> null); } @SuppressWarnings("unchecked") @@ -727,44 +560,32 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - List> listOfDStreams1 = Arrays.>asList(stream1, stream2); + List> listOfDStreams1 = Arrays.asList(stream1, stream2); // This is just to test whether this transform to JavaStream compiles ssc.transform( listOfDStreams1, - new Function2>, Time, JavaRDD>() { - @Override - public JavaRDD call(List> listOfRDDs, Time time) { - Assert.assertEquals(2, listOfRDDs.size()); - return null; - } + (listOfRDDs, time) -> { + Assert.assertEquals(2, listOfRDDs.size()); + return null; } ); List> listOfDStreams2 = - Arrays.>asList(stream1, stream2, pairStream1.toJavaDStream()); + Arrays.asList(stream1, stream2, pairStream1.toJavaDStream()); JavaPairDStream> transformed2 = ssc.transformToPair( listOfDStreams2, - new Function2>, Time, JavaPairRDD>>() { - @Override - public JavaPairRDD> call(List> listOfRDDs, - Time time) { - Assert.assertEquals(3, listOfRDDs.size()); - JavaRDD rdd1 = (JavaRDD)listOfRDDs.get(0); - JavaRDD rdd2 = (JavaRDD)listOfRDDs.get(1); - JavaRDD> rdd3 = - (JavaRDD>)listOfRDDs.get(2); - JavaPairRDD prdd3 = JavaPairRDD.fromJavaRDD(rdd3); - PairFunction mapToTuple = - new PairFunction() { - @Override - public Tuple2 call(Integer i) { - return new Tuple2<>(i, i); - } - }; - return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); - } + (listOfRDDs, time) -> { + Assert.assertEquals(3, listOfRDDs.size()); + JavaRDD rdd1 = (JavaRDD)listOfRDDs.get(0); + JavaRDD rdd2 = (JavaRDD)listOfRDDs.get(1); + JavaRDD> rdd3 = + (JavaRDD>)listOfRDDs.get(2); + JavaPairRDD prdd3 = JavaPairRDD.fromJavaRDD(rdd3); + PairFunction mapToTuple = + (PairFunction) i -> new Tuple2<>(i, i); + return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); } ); JavaTestUtils.attachTestOutputStream(transformed2); @@ -787,12 +608,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("a","t","h","l","e","t","i","c","s")); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream flatMapped = stream.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(x.split("(?!^)")).iterator(); - } - }); + JavaDStream flatMapped = stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -811,25 +627,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output - stream.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - accumRdd.add(1); - rdd.foreach(new VoidFunction() { - @Override - public void call(Integer i) { - accumEle.add(1); - } - }); - } + stream.foreachRDD(rdd -> { + accumRdd.add(1); + rdd.foreach(i -> accumEle.add(1)); }); // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD(new VoidFunction2, Time>() { - @Override - public void call(JavaRDD rdd, Time time) { - } - }); + stream.foreachRDD((rdd, time) -> {}); JavaTestUtils.runStreams(ssc, 2, 2); @@ -873,16 +677,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<>(9, "s"))); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMapToPair( - new PairFlatMapFunction() { - @Override - public Iterator> call(String in) { - List> out = new ArrayList<>(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<>(in.length(), letter)); - } - return out.iterator(); + JavaPairDStream flatMapped = stream.flatMapToPair(in -> { + List> out = new ArrayList<>(); + for (String letter : in.split("(?!^)")) { + out.add(new Tuple2<>(in.length(), letter)); } + return out.iterator(); }); JavaTestUtils.attachTestOutputStream(flatMapped); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -949,21 +749,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(new Tuple2<>("yankees", 7))); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = stream.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String in) { - return new Tuple2<>(in, in.length()); - } - }); + JavaPairDStream pairStream = + stream.mapToPair(in -> new Tuple2<>(in, in.length())); - JavaPairDStream filtered = pairStream.filter( - new Function, Boolean>() { - @Override - public Boolean call(Tuple2 in) { - return in._1().contains("a"); - } - }); + JavaPairDStream filtered = pairStream.filter(in -> in._1().contains("a")); JavaTestUtils.attachTestOutputStream(filtered); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1014,13 +803,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream reversed = pairStream.mapToPair( - new PairFunction, Integer, String>() { - @Override - public Tuple2 call(Tuple2 in) { - return in.swap(); - } - }); + JavaPairDStream reversed = pairStream.mapToPair(Tuple2::swap); JavaTestUtils.attachTestOutputStream(reversed); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1048,18 +831,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream reversed = pairStream.mapPartitionsToPair( - new PairFlatMapFunction>, Integer, String>() { - @Override - public Iterator> call(Iterator> in) { - List> out = new LinkedList<>(); - while (in.hasNext()) { - Tuple2 next = in.next(); - out.add(next.swap()); - } - return out.iterator(); - } - }); + JavaPairDStream reversed = pairStream.mapPartitionsToPair(in -> { + List> out = new LinkedList<>(); + while (in.hasNext()) { + Tuple2 next = in.next(); + out.add(next.swap()); + } + return out.iterator(); + }); JavaTestUtils.attachTestOutputStream(reversed); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1079,13 +858,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream reversed = pairStream.map( - new Function, Integer>() { - @Override - public Integer call(Tuple2 in) { - return in._2(); - } - }); + JavaDStream reversed = pairStream.map(in -> in._2()); JavaTestUtils.attachTestOutputStream(reversed); List> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1119,17 +892,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream flatMapped = pairStream.flatMapToPair( - new PairFlatMapFunction, Integer, String>() { - @Override - public Iterator> call(Tuple2 in) { - List> out = new LinkedList<>(); - for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<>(in._2(), s.toString())); - } - return out.iterator(); - } - }); + JavaPairDStream flatMapped = pairStream.flatMapToPair(in -> { + List> out = new LinkedList<>(); + for (Character s : in._1().toCharArray()) { + out.add(new Tuple2<>(in._2(), s.toString())); + } + return out.iterator(); + }); JavaTestUtils.attachTestOutputStream(flatMapped); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1216,12 +985,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream combined = pairStream.combineByKey( - new Function() { - @Override - public Integer call(Integer i) { - return i; - } - }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); + i -> i, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); JavaTestUtils.attachTestOutputStream(combined); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1345,20 +1109,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream updated = pairStream.updateStateByKey( - new Function2, Optional, Optional>() { - @Override - public Optional call(List values, Optional state) { - int out = 0; - if (state.isPresent()) { - out += state.get(); - } - for (Integer v : values) { - out += v; - } - return Optional.of(out); - } - }); + JavaPairDStream updated = pairStream.updateStateByKey((values, state) -> { + int out = 0; + if (state.isPresent()) { + out += state.get(); + } + for (Integer v : values) { + out += v; + } + return Optional.of(out); + }); JavaTestUtils.attachTestOutputStream(updated); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1389,20 +1149,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream updated = pairStream.updateStateByKey( - new Function2, Optional, Optional>() { - @Override - public Optional call(List values, Optional state) { - int out = 0; - if (state.isPresent()) { - out += state.get(); - } - for (Integer v : values) { - out += v; - } - return Optional.of(out); - } - }, new HashPartitioner(1), initialRDD); + JavaPairDStream updated = pairStream.updateStateByKey((values, state) -> { + int out = 0; + if (state.isPresent()) { + out += state.get(); + } + for (Integer v : values) { + out += v; + } + return Optional.of(out); + }, new HashPartitioner(1), initialRDD); JavaTestUtils.attachTestOutputStream(updated); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1500,13 +1256,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream sorted = pairStream.transformToPair( - new Function, JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaPairRDD in) { - return in.sortByKey(); - } - }); + JavaPairDStream sorted = pairStream.transformToPair(in -> in.sortByKey()); JavaTestUtils.attachTestOutputStream(sorted); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1537,18 +1287,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream firstParts = pairStream.transform( - new Function, JavaRDD>() { - @Override - public JavaRDD call(JavaPairRDD in) { - return in.map(new Function, Integer>() { - @Override - public Integer call(Tuple2 in2) { - return in2._1(); - } - }); - } - }); + JavaDStream firstParts = pairStream.transform(in -> in.map(in2 -> in2._1())); JavaTestUtils.attachTestOutputStream(firstParts); List> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1575,12 +1314,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream mapped = pairStream.mapValues(new Function() { - @Override - public String call(String s) { - return s.toUpperCase(Locale.ENGLISH); - } - }); + JavaPairDStream mapped = pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH)); JavaTestUtils.attachTestOutputStream(mapped); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1616,16 +1350,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream flatMapped = pairStream.flatMapValues( - new Function>() { - @Override - public Iterable call(String in) { - List out = new ArrayList<>(); - out.add(in + "1"); - out.add(in + "2"); - return out; - } - }); + JavaPairDStream flatMapped = pairStream.flatMapValues(in -> { + List out = new ArrayList<>(); + out.add(in + "1"); + out.add(in + "2"); + return out; + }); JavaTestUtils.attachTestOutputStream(flatMapped); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1795,12 +1525,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc.checkpoint(tempDir.getAbsolutePath()); JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) { - return s.length(); - } - }); + JavaDStream letterCount = stream.map(String::length); JavaCheckpointTestUtils.attachTestOutputStream(letterCount); List> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); @@ -1822,7 +1547,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testContextGetOrCreate() throws InterruptedException { ssc.stop(); - final SparkConf conf = new SparkConf() + SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("test") .set("newContext", "true"); @@ -1835,13 +1560,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // Function to create JavaStreamingContext without any output operations // (used to detect the new context) - final AtomicBoolean newContextCreated = new AtomicBoolean(false); - Function0 creatingFunc = new Function0() { - @Override - public JavaStreamingContext call() { - newContextCreated.set(true); - return new JavaStreamingContext(conf, Seconds.apply(1)); - } + AtomicBoolean newContextCreated = new AtomicBoolean(false); + Function0 creatingFunc = () -> { + newContextCreated.set(true); + return new JavaStreamingContext(conf, Seconds.apply(1)); }; newContextCreated.set(false); @@ -1912,18 +1634,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc.socketStream( "localhost", 12345, - new Function>() { - @Override - public Iterable call(InputStream in) throws IOException { - List out = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(in, StandardCharsets.UTF_8))) { - for (String line; (line = reader.readLine()) != null;) { - out.add(line); - } + in -> { + List out = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(in, StandardCharsets.UTF_8))) { + for (String line; (line = reader.readLine()) != null;) { + out.add(line); } - return out; } + return out; }, StorageLevel.MEMORY_ONLY()); } @@ -1952,21 +1671,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa LongWritable.class, Text.class, TextInputFormat.class, - new Function() { - @Override - public Boolean call(Path v1) { - return Boolean.TRUE; - } - }, + v1 -> Boolean.TRUE, true); - JavaDStream test = inputStream.map( - new Function, String>() { - @Override - public String call(Tuple2 v1) { - return v1._2().toString(); - } - }); + JavaDStream test = inputStream.map(v1 -> v1._2().toString()); JavaTestUtils.attachTestOutputStream(test); List> result = JavaTestUtils.runStreams(ssc, 1, 1); -- cgit v1.2.3