diff options
Diffstat (limited to 'streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java')
-rw-r--r-- | streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java | 65 |
1 files changed, 38 insertions, 27 deletions
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java index 9948a4074c..80513de4ee 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java @@ -20,10 +20,13 @@ package test.org.apache.spark.streaming; import java.io.Serializable; import java.util.*; +import org.apache.spark.api.java.function.Function3; +import org.apache.spark.api.java.function.Function4; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.JavaTestUtils; import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.State; import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.Time; import scala.Tuple2; @@ -142,8 +145,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Arrays.asList(24)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y, - (x, y) -> x - y, new Duration(2000), new Duration(1000)); + JavaDStream<Integer> reducedWindowed = stream.reduceByWindow( + (x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reducedWindowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -850,36 +853,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaPairRDD<String, Boolean> initialRDD = null; JavaPairDStream<String, Integer> wordsDstream = null; + Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mapFn = + (time, key, value, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); + }; + JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); + wordsDstream.mapWithState( + StateSpec.function(mapFn) + .initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots(); + Function3<String, Optional<Integer>, State<Boolean>, Double> mapFn2 = + (key, value, state) -> { + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; + }; + JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> { - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); + wordsDstream.mapWithState( + StateSpec.function(mapFn2) + .initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots(); } |