aboutsummaryrefslogtreecommitdiff
path: root/extras/java8-tests
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-11-12 17:48:43 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-12 17:48:43 -0800
commit0f1d00a905614bb5eebf260566dbcb831158d445 (patch)
tree5b46386a0c742cd035549fa26c08da296010e86d /extras/java8-tests
parent41bbd2300472501d69ed46f0407d5ed7cbede4a8 (diff)
downloadspark-0f1d00a905614bb5eebf260566dbcb831158d445.tar.gz
spark-0f1d00a905614bb5eebf260566dbcb831158d445.tar.bz2
spark-0f1d00a905614bb5eebf260566dbcb831158d445.zip
[SPARK-11663][STREAMING] Add Java API for trackStateByKey
TODO - [x] Add Java API - [x] Add API tests - [x] Add a function test Author: Shixiong Zhu <shixiong@databricks.com> Closes #9636 from zsxwing/java-track.
Diffstat (limited to 'extras/java8-tests')
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java43
1 files changed, 43 insertions, 0 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 73091cfe2c..163ae92c12 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -31,9 +31,12 @@ import org.junit.Test;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaTrackStateDStream;
/**
* Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
@@ -831,4 +834,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Assert.assertEquals(expected, result);
}
+ /**
+ * This test is only for testing the APIs. It's not necessary to run it.
+ */
+ public void testTrackStateByAPI() {
+ JavaPairRDD<String, Boolean> initialRDD = null;
+ JavaPairDStream<String, Integer> wordsDstream = null;
+
+ JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream =
+ wordsDstream.trackStateByKey(
+ StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return Optional.of(2.0);
+ }).initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
+
+ JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
+
+ JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream2 =
+ wordsDstream.trackStateByKey(
+ StateSpec.<String, Integer, Boolean, Double>function((value, state) -> {
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return 2.0;
+ }).initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
+
+ JavaPairDStream<String, Boolean> emittedRecords2 = stateDstream2.stateSnapshots();
+ }
}