aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-22 16:39:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-12-22 16:39:10 -0800
commit20591afd790799327f99485c5a969ed7412eca45 (patch)
treedad9877404d7559a53aab11d9a01df342cd17498 /examples
parent93db50d1c2ff97e6eb9200a995e4601f752968ae (diff)
downloadspark-20591afd790799327f99485c5a969ed7412eca45.tar.gz
spark-20591afd790799327f99485c5a969ed7412eca45.tar.bz2
spark-20591afd790799327f99485c5a969ed7412eca45.zip
[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming
This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10385 from zsxwing/accumulator-broadcast-example.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java71
-rw-r--r--examples/src/main/python/streaming/recoverable_network_wordcount.py30
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala66
3 files changed, 157 insertions, 10 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index bceda97f05..90d473703e 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -21,17 +21,22 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.List;
import java.util.regex.Pattern;
import scala.Tuple2;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
+import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -41,7 +46,48 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
/**
- * Counts words in text encoded with UTF8 received from the network every second.
+ * Use this singleton to get or register a Broadcast variable.
+ */
+class JavaWordBlacklist {
+
+ private static volatile Broadcast<List<String>> instance = null;
+
+ public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
+ if (instance == null) {
+ synchronized (JavaWordBlacklist.class) {
+ if (instance == null) {
+ List<String> wordBlacklist = Arrays.asList("a", "b", "c");
+ instance = jsc.broadcast(wordBlacklist);
+ }
+ }
+ }
+ return instance;
+ }
+}
+
+/**
+ * Use this singleton to get or register an Accumulator.
+ */
+class JavaDroppedWordsCounter {
+
+ private static volatile Accumulator<Integer> instance = null;
+
+ public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
+ if (instance == null) {
+ synchronized (JavaDroppedWordsCounter.class) {
+ if (instance == null) {
+ instance = jsc.accumulator(0, "WordsInBlacklistCounter");
+ }
+ }
+ }
+ return instance;
+ }
+}
+
+/**
+ * Counts words in text encoded with UTF8 received from the network every second. This example also
+ * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
+ * they can be registered on driver failures.
*
* Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
@@ -111,10 +157,27 @@ public final class JavaRecoverableNetworkWordCount {
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
- String counts = "Counts at time " + time + " " + rdd.collect();
- System.out.println(counts);
+ // Get or register the blacklist Broadcast
+ final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+ // Get or register the droppedWordsCounter Accumulator
+ final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
+ if (blacklist.value().contains(wordCount._1())) {
+ droppedWordsCounter.add(wordCount._2());
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }).collect().toString();
+ String output = "Counts at time " + time + " " + counts;
+ System.out.println(output);
+ System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
System.out.println("Appending to " + outputFile.getAbsolutePath());
- Files.append(counts + "\n", outputFile, Charset.defaultCharset());
+ Files.append(output + "\n", outputFile, Charset.defaultCharset());
return null;
}
});
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py
index ac91f0a06b..52b2639cdf 100644
--- a/examples/src/main/python/streaming/recoverable_network_wordcount.py
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -44,6 +44,20 @@ from pyspark import SparkContext
from pyspark.streaming import StreamingContext
+# Get or register a Broadcast variable
+def getWordBlacklist(sparkContext):
+ if ('wordBlacklist' not in globals()):
+ globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
+ return globals()['wordBlacklist']
+
+
+# Get or register an Accumulator
+def getDroppedWordsCounter(sparkContext):
+ if ('droppedWordsCounter' not in globals()):
+ globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
+ return globals()['droppedWordsCounter']
+
+
def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
@@ -60,8 +74,22 @@ def createContext(host, port, outputPath):
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
def echo(time, rdd):
- counts = "Counts at time %s %s" % (time, rdd.collect())
+ # Get or register the blacklist Broadcast
+ blacklist = getWordBlacklist(rdd.context)
+ # Get or register the droppedWordsCounter Accumulator
+ droppedWordsCounter = getDroppedWordsCounter(rdd.context)
+
+ # Use blacklist to drop words and use droppedWordsCounter to count them
+ def filterFunc(wordCount):
+ if wordCount[0] in blacklist.value:
+ droppedWordsCounter.add(wordCount[1])
+ False
+ else:
+ True
+
+ counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
print(counts)
+ print("Dropped %d word(s) totally" % droppedWordsCounter.value)
print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 9916882e4f..38d4fd11f9 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -23,13 +23,55 @@ import java.nio.charset.Charset
import com.google.common.io.Files
-import org.apache.spark.SparkConf
+import org.apache.spark.{Accumulator, SparkConf, SparkContext}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
/**
- * Counts words in text encoded with UTF8 received from the network every second.
+ * Use this singleton to get or register a Broadcast variable.
+ */
+object WordBlacklist {
+
+ @volatile private var instance: Broadcast[Seq[String]] = null
+
+ def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
+ if (instance == null) {
+ synchronized {
+ if (instance == null) {
+ val wordBlacklist = Seq("a", "b", "c")
+ instance = sc.broadcast(wordBlacklist)
+ }
+ }
+ }
+ instance
+ }
+}
+
+/**
+ * Use this singleton to get or register an Accumulator.
+ */
+object DroppedWordsCounter {
+
+ @volatile private var instance: Accumulator[Long] = null
+
+ def getInstance(sc: SparkContext): Accumulator[Long] = {
+ if (instance == null) {
+ synchronized {
+ if (instance == null) {
+ instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+ }
+ }
+ }
+ instance
+ }
+}
+
+/**
+ * Counts words in text encoded with UTF8 received from the network every second. This example also
+ * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
+ * they can be registered on driver failures.
*
* Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
@@ -75,10 +117,24 @@ object RecoverableNetworkWordCount {
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
- val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
- println(counts)
+ // Get or register the blacklist Broadcast
+ val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
+ // Get or register the droppedWordsCounter Accumulator
+ val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ val counts = rdd.filter { case (word, count) =>
+ if (blacklist.value.contains(word)) {
+ droppedWordsCounter += count
+ false
+ } else {
+ true
+ }
+ }.collect().mkString("[", ", ", "]")
+ val output = "Counts at time " + time + " " + counts
+ println(output)
+ println("Dropped " + droppedWordsCounter.value + " word(s) totally")
println("Appending to " + outputFile.getAbsolutePath)
- Files.append(counts + "\n", outputFile, Charset.defaultCharset())
+ Files.append(output + "\n", outputFile, Charset.defaultCharset())
})
ssc
}