diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-12-22 16:39:10 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-12-22 16:39:10 -0800 |
commit | 20591afd790799327f99485c5a969ed7412eca45 (patch) | |
tree | dad9877404d7559a53aab11d9a01df342cd17498 /docs/streaming-programming-guide.md | |
parent | 93db50d1c2ff97e6eb9200a995e4601f752968ae (diff) | |
download | spark-20591afd790799327f99485c5a969ed7412eca45.tar.gz spark-20591afd790799327f99485c5a969ed7412eca45.tar.bz2 spark-20591afd790799327f99485c5a969ed7412eca45.zip |
[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming
This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10385 from zsxwing/accumulator-broadcast-example.
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r-- | docs/streaming-programming-guide.md | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index ed6b28c282..3b071c7da5 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim *** +## Accumulators and Broadcast Variables + +[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% highlight scala %} + +object WordBlacklist { + + @volatile private var instance: Broadcast[Seq[String]] = null + + def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { + if (instance == null) { + synchronized { + if (instance == null) { + val wordBlacklist = Seq("a", "b", "c") + instance = sc.broadcast(wordBlacklist) + } + } + } + instance + } +} + +object DroppedWordsCounter { + + @volatile private var instance: Accumulator[Long] = null + + def getInstance(sc: SparkContext): Accumulator[Long] = { + if (instance == null) { + synchronized { + if (instance == null) { + instance = sc.accumulator(0L, "WordsInBlacklistCounter") + } + } + } + instance + } +} + +wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { + // Get or register the blacklist Broadcast + val blacklist = WordBlacklist.getInstance(rdd.sparkContext) + // Get or register the droppedWordsCounter Accumulator + val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) + // Use blacklist to drop words and use droppedWordsCounter to count them + val counts = rdd.filter { case (word, count) => + if (blacklist.value.contains(word)) { + droppedWordsCounter += count + false + } else { + true + } + }.collect() + val output = "Counts at time " + time + " " + counts +}) + +{% endhighlight %} + +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala). +</div> +<div data-lang="java" markdown="1"> +{% highlight java %} + +class JavaWordBlacklist { + + private static volatile Broadcast<List<String>> instance = null; + + public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (JavaWordBlacklist.class) { + if (instance == null) { + List<String> wordBlacklist = Arrays.asList("a", "b", "c"); + instance = jsc.broadcast(wordBlacklist); + } + } + } + return instance; + } +} + +class JavaDroppedWordsCounter { + + private static volatile Accumulator<Integer> instance = null; + + public static Accumulator<Integer> getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (JavaDroppedWordsCounter.class) { + if (instance == null) { + instance = jsc.accumulator(0, "WordsInBlacklistCounter"); + } + } + } + return instance; + } +} + +wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() { + @Override + public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException { + // Get or register the blacklist Broadcast + final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); + // Get or register the droppedWordsCounter Accumulator + final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + // Use blacklist to drop words and use droppedWordsCounter to count them + String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { + @Override + public Boolean call(Tuple2<String, Integer> wordCount) throws Exception { + if (blacklist.value().contains(wordCount._1())) { + droppedWordsCounter.add(wordCount._2()); + return false; + } else { + return true; + } + } + }).collect().toString(); + String output = "Counts at time " + time + " " + counts; + } +} + +{% endhighlight %} + +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java). +</div> +<div data-lang="python" markdown="1"> +{% highlight python %} + +def getWordBlacklist(sparkContext): + if ('wordBlacklist' not in globals()): + globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) + return globals()['wordBlacklist'] + +def getDroppedWordsCounter(sparkContext): + if ('droppedWordsCounter' not in globals()): + globals()['droppedWordsCounter'] = sparkContext.accumulator(0) + return globals()['droppedWordsCounter'] + +def echo(time, rdd): + # Get or register the blacklist Broadcast + blacklist = getWordBlacklist(rdd.context) + # Get or register the droppedWordsCounter Accumulator + droppedWordsCounter = getDroppedWordsCounter(rdd.context) + + # Use blacklist to drop words and use droppedWordsCounter to count them + def filterFunc(wordCount): + if wordCount[0] in blacklist.value: + droppedWordsCounter.add(wordCount[1]) + False + else: + True + + counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) + +wordCounts.foreachRDD(echo) + +{% endhighlight %} + +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py). + +</div> +</div> + +*** + ## DataFrame and SQL Operations You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. |