aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/streaming/recoverable_network_wordcount.py
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-22 16:39:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-12-22 16:39:10 -0800
commit20591afd790799327f99485c5a969ed7412eca45 (patch)
treedad9877404d7559a53aab11d9a01df342cd17498 /examples/src/main/python/streaming/recoverable_network_wordcount.py
parent93db50d1c2ff97e6eb9200a995e4601f752968ae (diff)
downloadspark-20591afd790799327f99485c5a969ed7412eca45.tar.gz
spark-20591afd790799327f99485c5a969ed7412eca45.tar.bz2
spark-20591afd790799327f99485c5a969ed7412eca45.zip
[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming
This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10385 from zsxwing/accumulator-broadcast-example.
Diffstat (limited to 'examples/src/main/python/streaming/recoverable_network_wordcount.py')
-rw-r--r--examples/src/main/python/streaming/recoverable_network_wordcount.py30
1 files changed, 29 insertions, 1 deletions
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py
index ac91f0a06b..52b2639cdf 100644
--- a/examples/src/main/python/streaming/recoverable_network_wordcount.py
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -44,6 +44,20 @@ from pyspark import SparkContext
from pyspark.streaming import StreamingContext
+# Get or register a Broadcast variable
+def getWordBlacklist(sparkContext):
+ if ('wordBlacklist' not in globals()):
+ globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
+ return globals()['wordBlacklist']
+
+
+# Get or register an Accumulator
+def getDroppedWordsCounter(sparkContext):
+ if ('droppedWordsCounter' not in globals()):
+ globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
+ return globals()['droppedWordsCounter']
+
+
def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
@@ -60,8 +74,22 @@ def createContext(host, port, outputPath):
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
def echo(time, rdd):
- counts = "Counts at time %s %s" % (time, rdd.collect())
+ # Get or register the blacklist Broadcast
+ blacklist = getWordBlacklist(rdd.context)
+ # Get or register the droppedWordsCounter Accumulator
+ droppedWordsCounter = getDroppedWordsCounter(rdd.context)
+
+ # Use blacklist to drop words and use droppedWordsCounter to count them
+ def filterFunc(wordCount):
+ if wordCount[0] in blacklist.value:
+ droppedWordsCounter.add(wordCount[1])
+ False
+ else:
+ True
+
+ counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
print(counts)
+ print("Dropped %d word(s) totally" % droppedWordsCounter.value)
print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")