aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/python/streaming/stateful_network_wordcount.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py
index 16ef646b7c..f8bbc659c2 100644
--- a/examples/src/main/python/streaming/stateful_network_wordcount.py
+++ b/examples/src/main/python/streaming/stateful_network_wordcount.py
@@ -44,13 +44,16 @@ if __name__ == "__main__":
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
+ # RDD with initial state (key, value) pairs
+ initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
+
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
- .updateStateByKey(updateFunc)
+ .updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.pprint()