aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
Diffstat (limited to 'extras')
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java18
1 files changed, 8 insertions, 10 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 4eee97bc89..89e0c7fdf7 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -32,12 +32,10 @@ import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaTrackStateDStream;
+import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
/**
* Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
@@ -863,12 +861,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
/**
* This test is only for testing the APIs. It's not necessary to run it.
*/
- public void testTrackStateByAPI() {
+ public void testMapWithStateAPI() {
JavaPairRDD<String, Boolean> initialRDD = null;
JavaPairDStream<String, Integer> wordsDstream = null;
- JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream =
- wordsDstream.trackStateByKey(
+ JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
+ wordsDstream.mapWithState(
StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> {
// Use all State's methods here
state.exists();
@@ -884,9 +882,9 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
- JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream2 =
- wordsDstream.trackStateByKey(
- StateSpec.<String, Integer, Boolean, Double>function((value, state) -> {
+ JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
+ wordsDstream.mapWithState(
+ StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> {
state.exists();
state.get();
state.isTimingOut();
@@ -898,6 +896,6 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
.partitioner(new HashPartitioner(10))
.timeout(Durations.seconds(10)));
- JavaPairDStream<String, Boolean> emittedRecords2 = stateDstream2.stateSnapshots();
+ JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
}
}