aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java')
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java65
1 files changed, 38 insertions, 27 deletions
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index 9948a4074c..80513de4ee 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -20,10 +20,13 @@ package test.org.apache.spark.streaming;
import java.io.Serializable;
import java.util.*;
+import org.apache.spark.api.java.function.Function3;
+import org.apache.spark.api.java.function.Function4;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.Time;
import scala.Tuple2;
@@ -142,8 +145,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Arrays.asList(24));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
- (x, y) -> x - y, new Duration(2000), new Duration(1000));
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(
+ (x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -850,36 +853,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaPairRDD<String, Boolean> initialRDD = null;
JavaPairDStream<String, Integer> wordsDstream = null;
+ Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mapFn =
+ (time, key, value, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return Optional.of(2.0);
+ };
+
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return Optional.of(2.0);
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
+ wordsDstream.mapWithState(
+ StateSpec.function(mapFn)
+ .initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
+ Function3<String, Optional<Integer>, State<Boolean>, Double> mapFn2 =
+ (key, value, state) -> {
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return 2.0;
+ };
+
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> {
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return 2.0;
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
+ wordsDstream.mapWithState(
+ StateSpec.function(mapFn2)
+ .initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
}