diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-12-22 16:39:10 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-12-22 16:39:10 -0800 |
commit | 20591afd790799327f99485c5a969ed7412eca45 (patch) | |
tree | dad9877404d7559a53aab11d9a01df342cd17498 /examples/src/main/python/streaming/recoverable_network_wordcount.py | |
parent | 93db50d1c2ff97e6eb9200a995e4601f752968ae (diff) | |
download | spark-20591afd790799327f99485c5a969ed7412eca45.tar.gz spark-20591afd790799327f99485c5a969ed7412eca45.tar.bz2 spark-20591afd790799327f99485c5a969ed7412eca45.zip |
[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming
This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10385 from zsxwing/accumulator-broadcast-example.
Diffstat (limited to 'examples/src/main/python/streaming/recoverable_network_wordcount.py')
-rw-r--r-- | examples/src/main/python/streaming/recoverable_network_wordcount.py | 30 |
1 files changed, 29 insertions, 1 deletions
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py index ac91f0a06b..52b2639cdf 100644 --- a/examples/src/main/python/streaming/recoverable_network_wordcount.py +++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py @@ -44,6 +44,20 @@ from pyspark import SparkContext from pyspark.streaming import StreamingContext +# Get or register a Broadcast variable +def getWordBlacklist(sparkContext): + if ('wordBlacklist' not in globals()): + globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) + return globals()['wordBlacklist'] + + +# Get or register an Accumulator +def getDroppedWordsCounter(sparkContext): + if ('droppedWordsCounter' not in globals()): + globals()['droppedWordsCounter'] = sparkContext.accumulator(0) + return globals()['droppedWordsCounter'] + + def createContext(host, port, outputPath): # If you do not see this printed, that means the StreamingContext has been loaded # from the new checkpoint @@ -60,8 +74,22 @@ def createContext(host, port, outputPath): wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) def echo(time, rdd): - counts = "Counts at time %s %s" % (time, rdd.collect()) + # Get or register the blacklist Broadcast + blacklist = getWordBlacklist(rdd.context) + # Get or register the droppedWordsCounter Accumulator + droppedWordsCounter = getDroppedWordsCounter(rdd.context) + + # Use blacklist to drop words and use droppedWordsCounter to count them + def filterFunc(wordCount): + if wordCount[0] in blacklist.value: + droppedWordsCounter.add(wordCount[1]) + False + else: + True + + counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) print(counts) + print("Dropped %d word(s) totally" % droppedWordsCounter.value) print("Appending to " + os.path.abspath(outputPath)) with open(outputPath, 'a') as f: f.write(counts + "\n") |