aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/test/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/java/test/org/apache')
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java65
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java7
2 files changed, 42 insertions, 30 deletions
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index 9948a4074c..80513de4ee 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -20,10 +20,13 @@ package test.org.apache.spark.streaming;
import java.io.Serializable;
import java.util.*;
+import org.apache.spark.api.java.function.Function3;
+import org.apache.spark.api.java.function.Function4;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.Time;
import scala.Tuple2;
@@ -142,8 +145,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Arrays.asList(24));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
- (x, y) -> x - y, new Duration(2000), new Duration(1000));
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(
+ (x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -850,36 +853,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaPairRDD<String, Boolean> initialRDD = null;
JavaPairDStream<String, Integer> wordsDstream = null;
+ Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mapFn =
+ (time, key, value, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return Optional.of(2.0);
+ };
+
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return Optional.of(2.0);
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
+ wordsDstream.mapWithState(
+ StateSpec.function(mapFn)
+ .initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
+ Function3<String, Optional<Integer>, State<Boolean>, Double> mapFn2 =
+ (key, value, state) -> {
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return 2.0;
+ };
+
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> {
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return 2.0;
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
+ wordsDstream.mapWithState(
+ StateSpec.function(mapFn2)
+ .initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
}
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index b966cbdca0..96f8d9593d 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -29,7 +29,6 @@ import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContextState;
import org.apache.spark.streaming.StreamingContextSuite;
-import org.apache.spark.streaming.Time;
import scala.Tuple2;
import org.apache.hadoop.conf.Configuration;
@@ -608,7 +607,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("a","t","h","l","e","t","i","c","s"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> flatMapped = stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator());
+ JavaDStream<String> flatMapped =
+ stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator());
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1314,7 +1314,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH));
+ JavaPairDStream<String, String> mapped =
+ pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH));
JavaTestUtils.attachTestOutputStream(mapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);