aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-19 09:42:50 -0800
committerSean Owen <sowen@cloudera.com>2017-02-19 09:42:50 -0800
commit1487c9af20a333ead55955acf4c0aa323bea0d07 (patch)
tree5f47daa77e0f73da1e009cc3dcf0a5c0073246aa /streaming/src/test/java/org/apache
parentde14d35f77071932963a994fac5aec0e5df838a1 (diff)
downloadspark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.gz
spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.bz2
spark-1487c9af20a333ead55955acf4c0aa323bea0d07.zip
[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features
## What changes were proposed in this pull request? Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code. ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #16964 from srowen/SPARK-19534.
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java81
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java24
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java10
3 files changed, 35 insertions, 80 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index 9b7701003d..cb8ed83e5a 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -27,9 +27,6 @@ import java.util.Set;
import scala.Tuple2;
import com.google.common.collect.Sets;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.util.ManualClock;
import org.junit.Assert;
@@ -53,18 +50,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
JavaPairDStream<String, Integer> wordsDstream = null;
Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc =
- new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() {
- @Override
- public Optional<Double> call(
- Time time, String word, Optional<Integer> one, State<Boolean> state) {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return Optional.of(2.0);
- }
+ (time, word, one, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return Optional.of(2.0);
};
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
@@ -78,17 +71,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
stateDstream.stateSnapshots();
Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
- new Function3<String, Optional<Integer>, State<Boolean>, Double>() {
- @Override
- public Double call(String key, Optional<Integer> one, State<Boolean> state) {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return 2.0;
- }
+ (key, one, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return 2.0;
};
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
@@ -136,13 +126,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
);
Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc =
- new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
- @Override
- public Integer call(String key, Optional<Integer> value, State<Integer> state) {
- int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
- state.update(sum);
- return sum;
- }
+ (key, value, state) -> {
+ int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
+ state.update(sum);
+ return sum;
};
testOperation(
inputData,
@@ -159,29 +146,15 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
int numBatches = expectedOutputs.size();
JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2);
JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
- JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() {
- @Override
- public Tuple2<K, Integer> call(K x) {
- return new Tuple2<>(x, 1);
- }
- })).mapWithState(mapWithStateSpec);
-
- final List<Set<T>> collectedOutputs =
+ JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec);
+
+ List<Set<T>> collectedOutputs =
Collections.synchronizedList(new ArrayList<Set<T>>());
- mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() {
- @Override
- public void call(JavaRDD<T> rdd) {
- collectedOutputs.add(Sets.newHashSet(rdd.collect()));
- }
- });
- final List<Set<Tuple2<K, S>>> collectedStateSnapshots =
+ mapWithStateDStream.foreachRDD(rdd -> collectedOutputs.add(Sets.newHashSet(rdd.collect())));
+ List<Set<Tuple2<K, S>>> collectedStateSnapshots =
Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>());
- mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() {
- @Override
- public void call(JavaPairRDD<K, S> rdd) {
- collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()));
- }
- });
+ mapWithStateDStream.stateSnapshots().foreachRDD(rdd ->
+ collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())));
BatchCounter batchCounter = new BatchCounter(ssc.ssc());
ssc.start();
((ManualClock) ssc.ssc().scheduler().clock())
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 091ccbfd85..9156047244 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -58,24 +58,16 @@ public class JavaReceiverAPISuite implements Serializable {
TestServer server = new TestServer(0);
server.start();
- final AtomicLong dataCounter = new AtomicLong(0);
+ AtomicLong dataCounter = new AtomicLong(0);
try {
JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200));
JavaReceiverInputDStream<String> input =
ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
- JavaDStream<String> mapped = input.map(new Function<String, String>() {
- @Override
- public String call(String v1) {
- return v1 + ".";
- }
- });
- mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- long count = rdd.count();
- dataCounter.addAndGet(count);
- }
+ JavaDStream<String> mapped = input.map((Function<String, String>) v1 -> v1 + ".");
+ mapped.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
+ long count = rdd.count();
+ dataCounter.addAndGet(count);
});
ssc.start();
@@ -110,11 +102,7 @@ public class JavaReceiverAPISuite implements Serializable {
@Override
public void onStart() {
- new Thread() {
- @Override public void run() {
- receive();
- }
- }.start();
+ new Thread(this::receive).start();
}
@Override
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
index f02fa87f61..3f4e6ddb21 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import org.apache.spark.SparkConf;
import org.apache.spark.network.util.JavaUtils;
@@ -81,12 +80,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
@Override
public Iterator<ByteBuffer> readAll() {
- return Iterators.transform(records.iterator(), new Function<Record,ByteBuffer>() {
- @Override
- public ByteBuffer apply(Record input) {
- return input.buffer;
- }
- });
+ return Iterators.transform(records.iterator(), input -> input.buffer);
}
@Override
@@ -114,7 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
String data1 = "data1";
WriteAheadLogRecordHandle handle = wal.write(JavaUtils.stringToBytes(data1), 1234);
Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle);
- Assert.assertEquals(JavaUtils.bytesToString(wal.read(handle)), data1);
+ Assert.assertEquals(data1, JavaUtils.bytesToString(wal.read(handle)));
wal.write(JavaUtils.stringToBytes("data2"), 1235);
wal.write(JavaUtils.stringToBytes("data3"), 1236);