aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-12-09 20:47:15 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-12-09 20:47:15 -0800
commitbd2cd4f53d1ca10f4896bd39b0e180d4929867a2 (patch)
tree308b2c7239d67191f95bd5673ab98f916b40bd58 /streaming/src/test/java/org/apache
parent2166c2a75083c2262e071a652dd52b1a33348b6e (diff)
downloadspark-bd2cd4f53d1ca10f4896bd39b0e180d4929867a2.tar.gz
spark-bd2cd4f53d1ca10f4896bd39b0e180d4929867a2.tar.bz2
spark-bd2cd4f53d1ca10f4896bd39b0e180d4929867a2.zip
[SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState and change tracking function signature
SPARK-12244: Based on feedback from early users and personal experience attempting to explain it, the name trackStateByKey had two problem. "trackState" is a completely new term which really does not give any intuition on what the operation is the resultant data stream of objects returned by the function is called in docs as the "emitted" data for the lack of a better. "mapWithState" makes sense because the API is like a mapping function like (Key, Value) => T with State as an additional parameter. The resultant data stream is "mapped data". So both problems are solved. SPARK-12245: From initial experiences, not having the key in the function makes it hard to return mapped stuff, as the whole information of the records is not there. Basically the user is restricted to doing something like mapValue() instead of map(). So adding the key as a parameter. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #10224 from tdas/rename.
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java (renamed from streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java)48
1 files changed, 24 insertions, 24 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index 89d0bb7b61..bc4bc2eb42 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -37,12 +37,12 @@ import org.junit.Test;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaTrackStateDStream;
+import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
-public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext implements Serializable {
+public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements Serializable {
/**
* This test is only for testing the APIs. It's not necessary to run it.
@@ -52,7 +52,7 @@ public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext implemen
JavaPairDStream<String, Integer> wordsDstream = null;
final Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>
- trackStateFunc =
+ mappingFunc =
new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() {
@Override
@@ -68,21 +68,21 @@ public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext implemen
}
};
- JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream =
- wordsDstream.trackStateByKey(
- StateSpec.function(trackStateFunc)
+ JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
+ wordsDstream.mapWithState(
+ StateSpec.function(mappingFunc)
.initialState(initialRDD)
.numPartitions(10)
.partitioner(new HashPartitioner(10))
.timeout(Durations.seconds(10)));
- JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
+ JavaPairDStream<String, Boolean> stateSnapshots = stateDstream.stateSnapshots();
- final Function2<Optional<Integer>, State<Boolean>, Double> trackStateFunc2 =
- new Function2<Optional<Integer>, State<Boolean>, Double>() {
+ final Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
+ new Function3<String, Optional<Integer>, State<Boolean>, Double>() {
@Override
- public Double call(Optional<Integer> one, State<Boolean> state) {
+ public Double call(String key, Optional<Integer> one, State<Boolean> state) {
// Use all State's methods here
state.exists();
state.get();
@@ -93,15 +93,15 @@ public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext implemen
}
};
- JavaTrackStateDStream<String, Integer, Boolean, Double> stateDstream2 =
- wordsDstream.trackStateByKey(
- StateSpec.<String, Integer, Boolean, Double>function(trackStateFunc2)
+ JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
+ wordsDstream.mapWithState(
+ StateSpec.<String, Integer, Boolean, Double>function(mappingFunc2)
.initialState(initialRDD)
.numPartitions(10)
.partitioner(new HashPartitioner(10))
.timeout(Durations.seconds(10)));
- JavaPairDStream<String, Boolean> emittedRecords2 = stateDstream2.stateSnapshots();
+ JavaPairDStream<String, Boolean> stateSnapshots2 = stateDstream2.stateSnapshots();
}
@Test
@@ -148,11 +148,11 @@ public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext implemen
new Tuple2<String, Integer>("c", 1))
);
- Function2<Optional<Integer>, State<Integer>, Integer> trackStateFunc =
- new Function2<Optional<Integer>, State<Integer>, Integer>() {
+ Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc =
+ new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
@Override
- public Integer call(Optional<Integer> value, State<Integer> state) throws Exception {
+ public Integer call(String key, Optional<Integer> value, State<Integer> state) throws Exception {
int sum = value.or(0) + (state.exists() ? state.get() : 0);
state.update(sum);
return sum;
@@ -160,29 +160,29 @@ public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext implemen
};
testOperation(
inputData,
- StateSpec.<String, Integer, Integer, Integer>function(trackStateFunc),
+ StateSpec.<String, Integer, Integer, Integer>function(mappingFunc),
outputData,
stateData);
}
private <K, S, T> void testOperation(
List<List<K>> input,
- StateSpec<K, Integer, S, T> trackStateSpec,
+ StateSpec<K, Integer, S, T> mapWithStateSpec,
List<Set<T>> expectedOutputs,
List<Set<Tuple2<K, S>>> expectedStateSnapshots) {
int numBatches = expectedOutputs.size();
JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2);
- JavaTrackStateDStream<K, Integer, S, T> trackeStateStream =
+ JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() {
@Override
public Tuple2<K, Integer> call(K x) throws Exception {
return new Tuple2<K, Integer>(x, 1);
}
- })).trackStateByKey(trackStateSpec);
+ })).mapWithState(mapWithStateSpec);
final List<Set<T>> collectedOutputs =
Collections.synchronizedList(Lists.<Set<T>>newArrayList());
- trackeStateStream.foreachRDD(new Function<JavaRDD<T>, Void>() {
+ mapWithStateDStream.foreachRDD(new Function<JavaRDD<T>, Void>() {
@Override
public Void call(JavaRDD<T> rdd) throws Exception {
collectedOutputs.add(Sets.newHashSet(rdd.collect()));
@@ -191,7 +191,7 @@ public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext implemen
});
final List<Set<Tuple2<K, S>>> collectedStateSnapshots =
Collections.synchronizedList(Lists.<Set<Tuple2<K, S>>>newArrayList());
- trackeStateStream.stateSnapshots().foreachRDD(new Function<JavaPairRDD<K, S>, Void>() {
+ mapWithStateDStream.stateSnapshots().foreachRDD(new Function<JavaPairRDD<K, S>, Void>() {
@Override
public Void call(JavaPairRDD<K, S> rdd) throws Exception {
collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()));