diff options
author | Bryan Cutler <bjcutler@us.ibm.com> | 2015-12-10 14:21:15 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-12-10 14:21:15 -0800 |
commit | 6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a (patch) | |
tree | a3b24c4b3c8682e6a4dfb659f600d7474636b9d3 /examples/src/main/python | |
parent | 4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c (diff) | |
download | spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.gz spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.bz2 spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.zip |
[SPARK-11713] [PYSPARK] [STREAMING] Initial RDD updateStateByKey for PySpark
Adding ability to define an initial state RDD for use with updateStateByKey PySpark. Added unit test and changed stateful_network_wordcount example to use initial RDD.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes #10082 from BryanCutler/initial-rdd-updateStateByKey-SPARK-11713.
Diffstat (limited to 'examples/src/main/python')
-rw-r--r-- | examples/src/main/python/streaming/stateful_network_wordcount.py | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py index 16ef646b7c..f8bbc659c2 100644 --- a/examples/src/main/python/streaming/stateful_network_wordcount.py +++ b/examples/src/main/python/streaming/stateful_network_wordcount.py @@ -44,13 +44,16 @@ if __name__ == "__main__": ssc = StreamingContext(sc, 1) ssc.checkpoint("checkpoint") + # RDD with initial state (key, value) pairs + initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) + def updateFunc(new_values, last_sum): return sum(new_values) + (last_sum or 0) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) running_counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\ - .updateStateByKey(updateFunc) + .updateStateByKey(updateFunc, initialRDD=initialStateRDD) running_counts.pprint() |