aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorBryan Cutler <bjcutler@us.ibm.com>2015-12-10 14:21:15 -0800
committerDavies Liu <davies.liu@gmail.com>2015-12-10 14:21:15 -0800
commit6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a (patch)
treea3b24c4b3c8682e6a4dfb659f600d7474636b9d3 /examples
parent4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c (diff)
downloadspark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.gz
spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.bz2
spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.zip
[SPARK-11713] [PYSPARK] [STREAMING] Initial RDD updateStateByKey for PySpark
Adding ability to define an initial state RDD for use with updateStateByKey PySpark. Added unit test and changed stateful_network_wordcount example to use initial RDD. Author: Bryan Cutler <bjcutler@us.ibm.com> Closes #10082 from BryanCutler/initial-rdd-updateStateByKey-SPARK-11713.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/python/streaming/stateful_network_wordcount.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py
index 16ef646b7c..f8bbc659c2 100644
--- a/examples/src/main/python/streaming/stateful_network_wordcount.py
+++ b/examples/src/main/python/streaming/stateful_network_wordcount.py
@@ -44,13 +44,16 @@ if __name__ == "__main__":
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
+ # RDD with initial state (key, value) pairs
+ initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
+
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
- .updateStateByKey(updateFunc)
+ .updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.pprint()