diff options
Diffstat (limited to 'extras/java8-tests')
-rw-r--r-- | extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java | 43 |
1 files changed, 43 insertions, 0 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 73091cfe2c..163ae92c12 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -31,9 +31,12 @@ import org.junit.Test; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.Function4; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaTrackStateDStream; /** * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 @@ -831,4 +834,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Assert.assertEquals(expected, result); } + /** + * This test is only for testing the APIs. It's not necessary to run it. + */ + public void testTrackStateByAPI() { + JavaPairRDD<String, Boolean> initialRDD = null; + JavaPairDStream<String, Integer> wordsDstream = null; + + JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream = + wordsDstream.trackStateByKey( + StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); + }).initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); + + JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots(); + + JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream2 = + wordsDstream.trackStateByKey( + StateSpec.<String, Integer, Boolean, Double>function((value, state) -> { + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; + }).initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); + + JavaPairDStream<String, Boolean> emittedRecords2 = stateDstream2.stateSnapshots(); + } } |