From bd2cd4f53d1ca10f4896bd39b0e180d4929867a2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 9 Dec 2015 20:47:15 -0800 Subject: [SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState and change tracking function signature SPARK-12244: Based on feedback from early users and personal experience attempting to explain it, the name trackStateByKey had two problem. "trackState" is a completely new term which really does not give any intuition on what the operation is the resultant data stream of objects returned by the function is called in docs as the "emitted" data for the lack of a better. "mapWithState" makes sense because the API is like a mapping function like (Key, Value) => T with State as an additional parameter. The resultant data stream is "mapped data". So both problems are solved. SPARK-12245: From initial experiences, not having the key in the function makes it hard to return mapped stuff, as the whole information of the records is not there. Basically the user is restricted to doing something like mapValue() instead of map(). So adding the key as a parameter. Author: Tathagata Das Closes #10224 from tdas/rename. --- .../java/org/apache/spark/streaming/Java8APISuite.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) (limited to 'extras') diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 4eee97bc89..89e0c7fdf7 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -32,12 +32,10 @@ import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.Function4; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaTrackStateDStream; +import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; /** * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 @@ -863,12 +861,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ /** * This test is only for testing the APIs. It's not necessary to run it. */ - public void testTrackStateByAPI() { + public void testMapWithStateAPI() { JavaPairRDD initialRDD = null; JavaPairDStream wordsDstream = null; - JavaTrackStateDStream stateDstream = - wordsDstream.trackStateByKey( + JavaMapWithStateDStream stateDstream = + wordsDstream.mapWithState( StateSpec. function((time, key, value, state) -> { // Use all State's methods here state.exists(); @@ -884,9 +882,9 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaPairDStream emittedRecords = stateDstream.stateSnapshots(); - JavaTrackStateDStream stateDstream2 = - wordsDstream.trackStateByKey( - StateSpec.function((value, state) -> { + JavaMapWithStateDStream stateDstream2 = + wordsDstream.mapWithState( + StateSpec.function((key, value, state) -> { state.exists(); state.get(); state.isTimingOut(); @@ -898,6 +896,6 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ .partitioner(new HashPartitioner(10)) .timeout(Durations.seconds(10))); - JavaPairDStream emittedRecords2 = stateDstream2.stateSnapshots(); + JavaPairDStream mappedDStream = stateDstream2.stateSnapshots(); } } -- cgit v1.2.3