aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-12-09 20:47:15 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-12-09 20:47:15 -0800
commitbd2cd4f53d1ca10f4896bd39b0e180d4929867a2 (patch)
tree308b2c7239d67191f95bd5673ab98f916b40bd58 /examples
parent2166c2a75083c2262e071a652dd52b1a33348b6e (diff)
downloadspark-bd2cd4f53d1ca10f4896bd39b0e180d4929867a2.tar.gz
spark-bd2cd4f53d1ca10f4896bd39b0e180d4929867a2.tar.bz2
spark-bd2cd4f53d1ca10f4896bd39b0e180d4929867a2.zip
[SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState and change tracking function signature
SPARK-12244: Based on feedback from early users and personal experience attempting to explain it, the name trackStateByKey had two problem. "trackState" is a completely new term which really does not give any intuition on what the operation is the resultant data stream of objects returned by the function is called in docs as the "emitted" data for the lack of a better. "mapWithState" makes sense because the API is like a mapping function like (Key, Value) => T with State as an additional parameter. The resultant data stream is "mapped data". So both problems are solved. SPARK-12245: From initial experiences, not having the key in the function makes it hard to return mapped stuff, as the whole information of the records is not there. Basically the user is restricted to doing something like mapValue() instead of map(). So adding the key as a parameter. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #10224 from tdas/rename.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java16
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala12
2 files changed, 14 insertions, 14 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index c400e4237a..14997c64d5 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -65,7 +65,7 @@ public class JavaStatefulNetworkWordCount {
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint(".");
- // Initial RDD input to trackStateByKey
+ // Initial state RDD input to mapWithState
@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 1));
@@ -90,21 +90,21 @@ public class JavaStatefulNetworkWordCount {
});
// Update the cumulative count function
- final Function4<Time, String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>> trackStateFunc =
- new Function4<Time, String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>>() {
+ final Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
+ new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
- public Optional<Tuple2<String, Integer>> call(Time time, String word, Optional<Integer> one, State<Integer> state) {
+ public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
state.update(sum);
- return Optional.of(output);
+ return output;
}
};
- // This will give a Dstream made of state (which is the cumulative count of the words)
- JavaTrackStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
- wordsDstream.trackStateByKey(StateSpec.function(trackStateFunc).initialState(initialRDD));
+ // DStream made of get cumulative counts that get updated in every batch
+ JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
+ wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
stateDstream.print();
ssc.start();
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
index a4f847f118..2dce1820d9 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -49,7 +49,7 @@ object StatefulNetworkWordCount {
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
- // Initial RDD input to trackStateByKey
+ // Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
// Create a ReceiverInputDStream on target ip:port and count the
@@ -58,17 +58,17 @@ object StatefulNetworkWordCount {
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
- // Update the cumulative count using updateStateByKey
+ // Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of the words)
- val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
+ val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
- Some(output)
+ output
}
- val stateDstream = wordDstream.trackStateByKey(
- StateSpec.function(trackStateFunc).initialState(initialRDD))
+ val stateDstream = wordDstream.mapWithState(
+ StateSpec.function(mappingFunc).initialState(initialRDD))
stateDstream.print()
ssc.start()
ssc.awaitTermination()