aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-11-10 23:16:18 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-10 23:16:18 -0800
commit99f5f988612b3093d73d9ce98819767e822fcbff (patch)
treee69164304d215ade5b9c4e847e31ee295c61e62e /examples
parentbd70244b3cda62cc447fd4cc343d4eb5ddaec893 (diff)
downloadspark-99f5f988612b3093d73d9ce98819767e822fcbff.tar.gz
spark-99f5f988612b3093d73d9ce98819767e822fcbff.tar.bz2
spark-99f5f988612b3093d73d9ce98819767e822fcbff.zip
[SPARK-11290][STREAMING] Basic implementation of trackStateByKey
Current updateStateByKey provides stateful processing in Spark Streaming. It allows the user to maintain per-key state and manage that state using an updateFunction. The updateFunction is called for each key, and it uses new data and existing state of the key, to generate an updated state. However, based on community feedback, we have learnt the following lessons. * Need for more optimized state management that does not scan every key * Need to make it easier to implement common use cases - (a) timeout of idle data, (b) returning items other than state The high level idea that of this PR * Introduce a new API trackStateByKey that, allows the user to update per-key state, and emit arbitrary records. The new API is necessary as this will have significantly different semantics than the existing updateStateByKey API. This API will have direct support for timeouts. * Internally, the system will keep the state data as a map/list within the partitions of the state RDDs. The new data RDDs will be partitioned appropriately, and for all the key-value data, it will lookup the map/list in the state RDD partition and create a new list/map of updated state data. The new state RDD partition will be created based on the update data and if necessary, with old data. Here is the detailed design doc. Please take a look and provide feedback as comments. https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em This is still WIP. Major things left to be done. - [x] Implement basic functionality of state tracking, with initial RDD and timeouts - [x] Unit tests for state tracking - [x] Unit tests for initial RDD and timeout - [ ] Unit tests for TrackStateRDD - [x] state creating, updating, removing - [ ] emitting - [ ] checkpointing - [x] Misc unit tests for State, TrackStateSpec, etc. - [x] Update docs and experimental tags Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9256 from tdas/trackStateByKey.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala25
1 files changed, 10 insertions, 15 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
index 02ba1c2eed..be2ae0b473 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -44,18 +44,6 @@ object StatefulNetworkWordCount {
StreamingExamples.setStreamingLogLevels()
- val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- val currentCount = values.sum
-
- val previousCount = state.getOrElse(0)
-
- Some(currentCount + previousCount)
- }
-
- val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
- iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
- }
-
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
@@ -71,9 +59,16 @@ object StatefulNetworkWordCount {
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using updateStateByKey
- // This will give a Dstream made of state (which is the cumulative count of the words)
- val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
- new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
+ // This will give a DStream made of state (which is the cumulative count of the words)
+ val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
+ val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
+ val output = (word, sum)
+ state.update(sum)
+ Some(output)
+ }
+
+ val stateDstream = wordDstream.trackStateByKey(
+ StateSpec.function(trackStateFunc).initialState(initialRDD))
stateDstream.print()
ssc.start()
ssc.awaitTermination()