diff options
author | Sean Owen <sowen@cloudera.com> | 2017-02-19 09:42:50 -0800 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-02-19 09:42:50 -0800 |
commit | 1487c9af20a333ead55955acf4c0aa323bea0d07 (patch) | |
tree | 5f47daa77e0f73da1e009cc3dcf0a5c0073246aa /streaming/src/test/java/org/apache | |
parent | de14d35f77071932963a994fac5aec0e5df838a1 (diff) | |
download | spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.gz spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.bz2 spark-1487c9af20a333ead55955acf4c0aa323bea0d07.zip |
[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features
## What changes were proposed in this pull request?
Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code.
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes #16964 from srowen/SPARK-19534.
Diffstat (limited to 'streaming/src/test/java/org/apache')
3 files changed, 35 insertions, 80 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java index 9b7701003d..cb8ed83e5a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java @@ -27,9 +27,6 @@ import java.util.Set; import scala.Tuple2; import com.google.common.collect.Sets; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.util.ManualClock; import org.junit.Assert; @@ -53,18 +50,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaPairDStream<String, Integer> wordsDstream = null; Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc = - new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() { - @Override - public Optional<Double> call( - Time time, String word, Optional<Integer> one, State<Boolean> state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - } + (time, word, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); }; JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = @@ -78,17 +71,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements stateDstream.stateSnapshots(); Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 = - new Function3<String, Optional<Integer>, State<Boolean>, Double>() { - @Override - public Double call(String key, Optional<Integer> one, State<Boolean> state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - } + (key, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; }; JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = @@ -136,13 +126,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements ); Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc = - new Function3<String, Optional<Integer>, State<Integer>, Integer>() { - @Override - public Integer call(String key, Optional<Integer> value, State<Integer> state) { - int sum = value.orElse(0) + (state.exists() ? state.get() : 0); - state.update(sum); - return sum; - } + (key, value, state) -> { + int sum = value.orElse(0) + (state.exists() ? state.get() : 0); + state.update(sum); + return sum; }; testOperation( inputData, @@ -159,29 +146,15 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements int numBatches = expectedOutputs.size(); JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2); JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream = - JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() { - @Override - public Tuple2<K, Integer> call(K x) { - return new Tuple2<>(x, 1); - } - })).mapWithState(mapWithStateSpec); - - final List<Set<T>> collectedOutputs = + JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec); + + List<Set<T>> collectedOutputs = Collections.synchronizedList(new ArrayList<Set<T>>()); - mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() { - @Override - public void call(JavaRDD<T> rdd) { - collectedOutputs.add(Sets.newHashSet(rdd.collect())); - } - }); - final List<Set<Tuple2<K, S>>> collectedStateSnapshots = + mapWithStateDStream.foreachRDD(rdd -> collectedOutputs.add(Sets.newHashSet(rdd.collect()))); + List<Set<Tuple2<K, S>>> collectedStateSnapshots = Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>()); - mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() { - @Override - public void call(JavaPairRDD<K, S> rdd) { - collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())); - } - }); + mapWithStateDStream.stateSnapshots().foreachRDD(rdd -> + collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()))); BatchCounter batchCounter = new BatchCounter(ssc.ssc()); ssc.start(); ((ManualClock) ssc.ssc().scheduler().clock()) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 091ccbfd85..9156047244 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -58,24 +58,16 @@ public class JavaReceiverAPISuite implements Serializable { TestServer server = new TestServer(0); server.start(); - final AtomicLong dataCounter = new AtomicLong(0); + AtomicLong dataCounter = new AtomicLong(0); try { JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200)); JavaReceiverInputDStream<String> input = ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); - JavaDStream<String> mapped = input.map(new Function<String, String>() { - @Override - public String call(String v1) { - return v1 + "."; - } - }); - mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() { - @Override - public void call(JavaRDD<String> rdd) { - long count = rdd.count(); - dataCounter.addAndGet(count); - } + JavaDStream<String> mapped = input.map((Function<String, String>) v1 -> v1 + "."); + mapped.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> { + long count = rdd.count(); + dataCounter.addAndGet(count); }); ssc.start(); @@ -110,11 +102,7 @@ public class JavaReceiverAPISuite implements Serializable { @Override public void onStart() { - new Thread() { - @Override public void run() { - receive(); - } - }.start(); + new Thread(this::receive).start(); } @Override diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index f02fa87f61..3f4e6ddb21 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import com.google.common.base.Function; import com.google.common.collect.Iterators; import org.apache.spark.SparkConf; import org.apache.spark.network.util.JavaUtils; @@ -81,12 +80,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { @Override public Iterator<ByteBuffer> readAll() { - return Iterators.transform(records.iterator(), new Function<Record,ByteBuffer>() { - @Override - public ByteBuffer apply(Record input) { - return input.buffer; - } - }); + return Iterators.transform(records.iterator(), input -> input.buffer); } @Override @@ -114,7 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { String data1 = "data1"; WriteAheadLogRecordHandle handle = wal.write(JavaUtils.stringToBytes(data1), 1234); Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle); - Assert.assertEquals(JavaUtils.bytesToString(wal.read(handle)), data1); + Assert.assertEquals(data1, JavaUtils.bytesToString(wal.read(handle))); wal.write(JavaUtils.stringToBytes("data2"), 1235); wal.write(JavaUtils.stringToBytes("data3"), 1236); |