diff options
Diffstat (limited to 'streaming/src/test/java/test/org/apache')
-rw-r--r-- | streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java | 65 | ||||
-rw-r--r-- | streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java | 7 |
2 files changed, 42 insertions, 30 deletions
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java index 9948a4074c..80513de4ee 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java @@ -20,10 +20,13 @@ package test.org.apache.spark.streaming; import java.io.Serializable; import java.util.*; +import org.apache.spark.api.java.function.Function3; +import org.apache.spark.api.java.function.Function4; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.JavaTestUtils; import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.State; import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.Time; import scala.Tuple2; @@ -142,8 +145,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Arrays.asList(24)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y, - (x, y) -> x - y, new Duration(2000), new Duration(1000)); + JavaDStream<Integer> reducedWindowed = stream.reduceByWindow( + (x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reducedWindowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -850,36 +853,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaPairRDD<String, Boolean> initialRDD = null; JavaPairDStream<String, Integer> wordsDstream = null; + Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mapFn = + (time, key, value, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); + }; + JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); + wordsDstream.mapWithState( + StateSpec.function(mapFn) + .initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots(); + Function3<String, Optional<Integer>, State<Boolean>, Double> mapFn2 = + (key, value, state) -> { + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; + }; + JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> { - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); + wordsDstream.mapWithState( + StateSpec.function(mapFn2) + .initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots(); } diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java index b966cbdca0..96f8d9593d 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -29,7 +29,6 @@ import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContextState; import org.apache.spark.streaming.StreamingContextSuite; -import org.apache.spark.streaming.Time; import scala.Tuple2; import org.apache.hadoop.conf.Configuration; @@ -608,7 +607,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("a","t","h","l","e","t","i","c","s")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> flatMapped = stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator()); + JavaDStream<String> flatMapped = + stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1314,7 +1314,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH)); + JavaPairDStream<String, String> mapped = + pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH)); JavaTestUtils.attachTestOutputStream(mapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); |