diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-11-12 17:48:43 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-11-12 17:48:43 -0800 |
commit | 0f1d00a905614bb5eebf260566dbcb831158d445 (patch) | |
tree | 5b46386a0c742cd035549fa26c08da296010e86d /extras/java8-tests | |
parent | 41bbd2300472501d69ed46f0407d5ed7cbede4a8 (diff) | |
download | spark-0f1d00a905614bb5eebf260566dbcb831158d445.tar.gz spark-0f1d00a905614bb5eebf260566dbcb831158d445.tar.bz2 spark-0f1d00a905614bb5eebf260566dbcb831158d445.zip |
[SPARK-11663][STREAMING] Add Java API for trackStateByKey
TODO
- [x] Add Java API
- [x] Add API tests
- [x] Add a function test
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #9636 from zsxwing/java-track.
Diffstat (limited to 'extras/java8-tests')
-rw-r--r-- | extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java | 43 |
1 files changed, 43 insertions, 0 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 73091cfe2c..163ae92c12 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -31,9 +31,12 @@ import org.junit.Test; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.Function4; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaTrackStateDStream; /** * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 @@ -831,4 +834,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Assert.assertEquals(expected, result); } + /** + * This test is only for testing the APIs. It's not necessary to run it. + */ + public void testTrackStateByAPI() { + JavaPairRDD<String, Boolean> initialRDD = null; + JavaPairDStream<String, Integer> wordsDstream = null; + + JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream = + wordsDstream.trackStateByKey( + StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); + }).initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); + + JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots(); + + JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream2 = + wordsDstream.trackStateByKey( + StateSpec.<String, Integer, Boolean, Double>function((value, state) -> { + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; + }).initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); + + JavaPairDStream<String, Boolean> emittedRecords2 = stateDstream2.stateSnapshots(); + } } |