diff options
Diffstat (limited to 'streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java | 81 |
1 files changed, 27 insertions, 54 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java index 9b7701003d..cb8ed83e5a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java @@ -27,9 +27,6 @@ import java.util.Set; import scala.Tuple2; import com.google.common.collect.Sets; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.util.ManualClock; import org.junit.Assert; @@ -53,18 +50,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaPairDStream<String, Integer> wordsDstream = null; Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc = - new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() { - @Override - public Optional<Double> call( - Time time, String word, Optional<Integer> one, State<Boolean> state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - } + (time, word, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); }; JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = @@ -78,17 +71,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements stateDstream.stateSnapshots(); Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 = - new Function3<String, Optional<Integer>, State<Boolean>, Double>() { - @Override - public Double call(String key, Optional<Integer> one, State<Boolean> state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - } + (key, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; }; JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = @@ -136,13 +126,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements ); Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc = - new Function3<String, Optional<Integer>, State<Integer>, Integer>() { - @Override - public Integer call(String key, Optional<Integer> value, State<Integer> state) { - int sum = value.orElse(0) + (state.exists() ? state.get() : 0); - state.update(sum); - return sum; - } + (key, value, state) -> { + int sum = value.orElse(0) + (state.exists() ? state.get() : 0); + state.update(sum); + return sum; }; testOperation( inputData, @@ -159,29 +146,15 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements int numBatches = expectedOutputs.size(); JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2); JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream = - JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() { - @Override - public Tuple2<K, Integer> call(K x) { - return new Tuple2<>(x, 1); - } - })).mapWithState(mapWithStateSpec); - - final List<Set<T>> collectedOutputs = + JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec); + + List<Set<T>> collectedOutputs = Collections.synchronizedList(new ArrayList<Set<T>>()); - mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() { - @Override - public void call(JavaRDD<T> rdd) { - collectedOutputs.add(Sets.newHashSet(rdd.collect())); - } - }); - final List<Set<Tuple2<K, S>>> collectedStateSnapshots = + mapWithStateDStream.foreachRDD(rdd -> collectedOutputs.add(Sets.newHashSet(rdd.collect()))); + List<Set<Tuple2<K, S>>> collectedStateSnapshots = Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>()); - mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() { - @Override - public void call(JavaPairRDD<K, S> rdd) { - collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())); - } - }); + mapWithStateDStream.stateSnapshots().foreachRDD(rdd -> + collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()))); BatchCounter batchCounter = new BatchCounter(ssc.ssc()); ssc.start(); ((ManualClock) ssc.ssc().scheduler().clock()) |