aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-programming-guide.md
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-22 16:39:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-12-22 16:39:10 -0800
commit20591afd790799327f99485c5a969ed7412eca45 (patch)
treedad9877404d7559a53aab11d9a01df342cd17498 /docs/streaming-programming-guide.md
parent93db50d1c2ff97e6eb9200a995e4601f752968ae (diff)
downloadspark-20591afd790799327f99485c5a969ed7412eca45.tar.gz
spark-20591afd790799327f99485c5a969ed7412eca45.tar.bz2
spark-20591afd790799327f99485c5a969ed7412eca45.zip
[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming
This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10385 from zsxwing/accumulator-broadcast-example.
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r--docs/streaming-programming-guide.md165
1 files changed, 165 insertions, 0 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index ed6b28c282..3b071c7da5 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim
***
+## Accumulators and Broadcast Variables
+
+[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+object WordBlacklist {
+
+ @volatile private var instance: Broadcast[Seq[String]] = null
+
+ def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
+ if (instance == null) {
+ synchronized {
+ if (instance == null) {
+ val wordBlacklist = Seq("a", "b", "c")
+ instance = sc.broadcast(wordBlacklist)
+ }
+ }
+ }
+ instance
+ }
+}
+
+object DroppedWordsCounter {
+
+ @volatile private var instance: Accumulator[Long] = null
+
+ def getInstance(sc: SparkContext): Accumulator[Long] = {
+ if (instance == null) {
+ synchronized {
+ if (instance == null) {
+ instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+ }
+ }
+ }
+ instance
+ }
+}
+
+wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+ // Get or register the blacklist Broadcast
+ val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
+ // Get or register the droppedWordsCounter Accumulator
+ val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ val counts = rdd.filter { case (word, count) =>
+ if (blacklist.value.contains(word)) {
+ droppedWordsCounter += count
+ false
+ } else {
+ true
+ }
+ }.collect()
+ val output = "Counts at time " + time + " " + counts
+})
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class JavaWordBlacklist {
+
+ private static volatile Broadcast<List<String>> instance = null;
+
+ public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
+ if (instance == null) {
+ synchronized (JavaWordBlacklist.class) {
+ if (instance == null) {
+ List<String> wordBlacklist = Arrays.asList("a", "b", "c");
+ instance = jsc.broadcast(wordBlacklist);
+ }
+ }
+ }
+ return instance;
+ }
+}
+
+class JavaDroppedWordsCounter {
+
+ private static volatile Accumulator<Integer> instance = null;
+
+ public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
+ if (instance == null) {
+ synchronized (JavaDroppedWordsCounter.class) {
+ if (instance == null) {
+ instance = jsc.accumulator(0, "WordsInBlacklistCounter");
+ }
+ }
+ }
+ return instance;
+ }
+}
+
+wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
+ @Override
+ public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
+ // Get or register the blacklist Broadcast
+ final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+ // Get or register the droppedWordsCounter Accumulator
+ final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
+ if (blacklist.value().contains(wordCount._1())) {
+ droppedWordsCounter.add(wordCount._2());
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }).collect().toString();
+ String output = "Counts at time " + time + " " + counts;
+ }
+}
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+def getWordBlacklist(sparkContext):
+ if ('wordBlacklist' not in globals()):
+ globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
+ return globals()['wordBlacklist']
+
+def getDroppedWordsCounter(sparkContext):
+ if ('droppedWordsCounter' not in globals()):
+ globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
+ return globals()['droppedWordsCounter']
+
+def echo(time, rdd):
+ # Get or register the blacklist Broadcast
+ blacklist = getWordBlacklist(rdd.context)
+ # Get or register the droppedWordsCounter Accumulator
+ droppedWordsCounter = getDroppedWordsCounter(rdd.context)
+
+ # Use blacklist to drop words and use droppedWordsCounter to count them
+ def filterFunc(wordCount):
+ if wordCount[0] in blacklist.value:
+ droppedWordsCounter.add(wordCount[1])
+ False
+ else:
+ True
+
+ counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
+
+wordCounts.foreachRDD(echo)
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
+
+</div>
+</div>
+
+***
+
## DataFrame and SQL Operations
You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.