aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java81
1 files changed, 27 insertions, 54 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index 9b7701003d..cb8ed83e5a 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -27,9 +27,6 @@ import java.util.Set;
import scala.Tuple2;
import com.google.common.collect.Sets;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.util.ManualClock;
import org.junit.Assert;
@@ -53,18 +50,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
JavaPairDStream<String, Integer> wordsDstream = null;
Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc =
- new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() {
- @Override
- public Optional<Double> call(
- Time time, String word, Optional<Integer> one, State<Boolean> state) {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return Optional.of(2.0);
- }
+ (time, word, one, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return Optional.of(2.0);
};
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
@@ -78,17 +71,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
stateDstream.stateSnapshots();
Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
- new Function3<String, Optional<Integer>, State<Boolean>, Double>() {
- @Override
- public Double call(String key, Optional<Integer> one, State<Boolean> state) {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return 2.0;
- }
+ (key, one, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return 2.0;
};
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
@@ -136,13 +126,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
);
Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc =
- new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
- @Override
- public Integer call(String key, Optional<Integer> value, State<Integer> state) {
- int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
- state.update(sum);
- return sum;
- }
+ (key, value, state) -> {
+ int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
+ state.update(sum);
+ return sum;
};
testOperation(
inputData,
@@ -159,29 +146,15 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
int numBatches = expectedOutputs.size();
JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2);
JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
- JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() {
- @Override
- public Tuple2<K, Integer> call(K x) {
- return new Tuple2<>(x, 1);
- }
- })).mapWithState(mapWithStateSpec);
-
- final List<Set<T>> collectedOutputs =
+ JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec);
+
+ List<Set<T>> collectedOutputs =
Collections.synchronizedList(new ArrayList<Set<T>>());
- mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() {
- @Override
- public void call(JavaRDD<T> rdd) {
- collectedOutputs.add(Sets.newHashSet(rdd.collect()));
- }
- });
- final List<Set<Tuple2<K, S>>> collectedStateSnapshots =
+ mapWithStateDStream.foreachRDD(rdd -> collectedOutputs.add(Sets.newHashSet(rdd.collect())));
+ List<Set<Tuple2<K, S>>> collectedStateSnapshots =
Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>());
- mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() {
- @Override
- public void call(JavaPairRDD<K, S> rdd) {
- collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()));
- }
- });
+ mapWithStateDStream.stateSnapshots().foreachRDD(rdd ->
+ collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())));
BatchCounter batchCounter = new BatchCounter(ssc.ssc());
ssc.start();
((ManualClock) ssc.ssc().scheduler().clock())