From 0f1d00a905614bb5eebf260566dbcb831158d445 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 12 Nov 2015 17:48:43 -0800 Subject: [SPARK-11663][STREAMING] Add Java API for trackStateByKey TODO - [x] Add Java API - [x] Add API tests - [x] Add a function test Author: Shixiong Zhu Closes #9636 from zsxwing/java-track. --- .../org/apache/spark/streaming/Java8APISuite.java | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) (limited to 'extras/java8-tests') diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 73091cfe2c..163ae92c12 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -31,9 +31,12 @@ import org.junit.Test; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.Function4; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaTrackStateDStream; /** * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 @@ -831,4 +834,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Assert.assertEquals(expected, result); } + /** + * This test is only for testing the APIs. It's not necessary to run it. + */ + public void testTrackStateByAPI() { + JavaPairRDD initialRDD = null; + JavaPairDStream wordsDstream = null; + + JavaTrackStateDStream stateDstream = + wordsDstream.trackStateByKey( + StateSpec. function((time, key, value, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); + }).initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); + + JavaPairDStream emittedRecords = stateDstream.stateSnapshots(); + + JavaTrackStateDStream stateDstream2 = + wordsDstream.trackStateByKey( + StateSpec.function((value, state) -> { + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; + }).initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); + + JavaPairDStream emittedRecords2 = stateDstream2.stateSnapshots(); + } } -- cgit v1.2.3