aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-programming-guide.md
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-06-02 11:07:15 -0500
committerSean Owen <sowen@cloudera.com>2016-06-02 11:07:15 -0500
commita0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa (patch)
treeade45af40121ff673c5f44c1632e55e851815ee8 /docs/streaming-programming-guide.md
parent5eea332307cbed5fc44427959f070afc16a12c02 (diff)
downloadspark-a0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa.tar.gz
spark-a0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa.tar.bz2
spark-a0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa.zip
[SPARK-15208][WIP][CORE][STREAMING][DOCS] Update Spark examples with AccumulatorV2
## What changes were proposed in this pull request? The patch updates the codes & docs in the example module as well as the related doc module: - [ ] [docs] `streaming-programming-guide.md` - [x] scala code part - [ ] java code part - [ ] python code part - [x] [examples] `RecoverableNetworkWordCount.scala` - [ ] [examples] `JavaRecoverableNetworkWordCount.java` - [ ] [examples] `recoverable_network_wordcount.py` ## How was this patch tested? Ran the examples and verified results manually. Author: Liwei Lin <lwlin7@gmail.com> Closes #12981 from lw-lin/accumulatorV2-examples.
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r--docs/streaming-programming-guide.md12
1 files changed, 6 insertions, 6 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 6550fcc052..78ae6a7407 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1395,13 +1395,13 @@ object WordBlacklist {
object DroppedWordsCounter {
- @volatile private var instance: Accumulator[Long] = null
+ @volatile private var instance: LongAccumulator = null
- def getInstance(sc: SparkContext): Accumulator[Long] = {
+ def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
- instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+ instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
@@ -1409,7 +1409,7 @@ object DroppedWordsCounter {
}
}
-wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
@@ -1417,12 +1417,12 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
- droppedWordsCounter += count
+ droppedWordsCounter.add(count)
false
} else {
true
}
- }.collect()
+ }.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})