aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-22 16:39:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-12-22 16:39:10 -0800
commit20591afd790799327f99485c5a969ed7412eca45 (patch)
treedad9877404d7559a53aab11d9a01df342cd17498 /docs
parent93db50d1c2ff97e6eb9200a995e4601f752968ae (diff)
downloadspark-20591afd790799327f99485c5a969ed7412eca45.tar.gz
spark-20591afd790799327f99485c5a969ed7412eca45.tar.bz2
spark-20591afd790799327f99485c5a969ed7412eca45.zip
[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming
This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10385 from zsxwing/accumulator-broadcast-example.
Diffstat (limited to 'docs')
-rw-r--r--docs/programming-guide.md6
-rw-r--r--docs/streaming-programming-guide.md165
2 files changed, 168 insertions, 3 deletions
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index c5e2a1cd7b..bad25e63e8 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may
What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.
-To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
+To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
@@ -1091,7 +1091,7 @@ for details.
</tr>
<tr>
<td> <b>foreach</b>(<i>func</i>) </td>
- <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#AccumLink">Accumulator</a> or interacting with external storage systems.
+ <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#accumulators">Accumulator</a> or interacting with external storage systems.
<br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <a href="#ClosuresLink">Understanding closures </a> for more details.</td>
</tr>
</table>
@@ -1338,7 +1338,7 @@ run on the cluster so that `v` is not shipped to the nodes more than once. In ad
`v` should not be modified after it is broadcast in order to ensure that all nodes get the same
value of the broadcast variable (e.g. if the variable is shipped to a new node later).
-## Accumulators <a name="AccumLink"></a>
+## Accumulators
Accumulators are variables that are only "added" to through an associative operation and can
therefore be efficiently supported in parallel. They can be used to implement counters (as in
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index ed6b28c282..3b071c7da5 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim
***
+## Accumulators and Broadcast Variables
+
+[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+object WordBlacklist {
+
+ @volatile private var instance: Broadcast[Seq[String]] = null
+
+ def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
+ if (instance == null) {
+ synchronized {
+ if (instance == null) {
+ val wordBlacklist = Seq("a", "b", "c")
+ instance = sc.broadcast(wordBlacklist)
+ }
+ }
+ }
+ instance
+ }
+}
+
+object DroppedWordsCounter {
+
+ @volatile private var instance: Accumulator[Long] = null
+
+ def getInstance(sc: SparkContext): Accumulator[Long] = {
+ if (instance == null) {
+ synchronized {
+ if (instance == null) {
+ instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+ }
+ }
+ }
+ instance
+ }
+}
+
+wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+ // Get or register the blacklist Broadcast
+ val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
+ // Get or register the droppedWordsCounter Accumulator
+ val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ val counts = rdd.filter { case (word, count) =>
+ if (blacklist.value.contains(word)) {
+ droppedWordsCounter += count
+ false
+ } else {
+ true
+ }
+ }.collect()
+ val output = "Counts at time " + time + " " + counts
+})
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class JavaWordBlacklist {
+
+ private static volatile Broadcast<List<String>> instance = null;
+
+ public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
+ if (instance == null) {
+ synchronized (JavaWordBlacklist.class) {
+ if (instance == null) {
+ List<String> wordBlacklist = Arrays.asList("a", "b", "c");
+ instance = jsc.broadcast(wordBlacklist);
+ }
+ }
+ }
+ return instance;
+ }
+}
+
+class JavaDroppedWordsCounter {
+
+ private static volatile Accumulator<Integer> instance = null;
+
+ public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
+ if (instance == null) {
+ synchronized (JavaDroppedWordsCounter.class) {
+ if (instance == null) {
+ instance = jsc.accumulator(0, "WordsInBlacklistCounter");
+ }
+ }
+ }
+ return instance;
+ }
+}
+
+wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
+ @Override
+ public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
+ // Get or register the blacklist Broadcast
+ final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+ // Get or register the droppedWordsCounter Accumulator
+ final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
+ if (blacklist.value().contains(wordCount._1())) {
+ droppedWordsCounter.add(wordCount._2());
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }).collect().toString();
+ String output = "Counts at time " + time + " " + counts;
+ }
+}
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+def getWordBlacklist(sparkContext):
+ if ('wordBlacklist' not in globals()):
+ globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
+ return globals()['wordBlacklist']
+
+def getDroppedWordsCounter(sparkContext):
+ if ('droppedWordsCounter' not in globals()):
+ globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
+ return globals()['droppedWordsCounter']
+
+def echo(time, rdd):
+ # Get or register the blacklist Broadcast
+ blacklist = getWordBlacklist(rdd.context)
+ # Get or register the droppedWordsCounter Accumulator
+ droppedWordsCounter = getDroppedWordsCounter(rdd.context)
+
+ # Use blacklist to drop words and use droppedWordsCounter to count them
+ def filterFunc(wordCount):
+ if wordCount[0] in blacklist.value:
+ droppedWordsCounter.add(wordCount[1])
+ False
+ else:
+ True
+
+ counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
+
+wordCounts.foreachRDD(echo)
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
+
+</div>
+</div>
+
+***
+
## DataFrame and SQL Operations
You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.