diff options
author | Liwei Lin <lwlin7@gmail.com> | 2016-06-02 11:07:15 -0500 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-06-02 11:07:15 -0500 |
commit | a0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa (patch) | |
tree | ade45af40121ff673c5f44c1632e55e851815ee8 /docs/streaming-programming-guide.md | |
parent | 5eea332307cbed5fc44427959f070afc16a12c02 (diff) | |
download | spark-a0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa.tar.gz spark-a0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa.tar.bz2 spark-a0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa.zip |
[SPARK-15208][WIP][CORE][STREAMING][DOCS] Update Spark examples with AccumulatorV2
## What changes were proposed in this pull request?
The patch updates the codes & docs in the example module as well as the related doc module:
- [ ] [docs] `streaming-programming-guide.md`
- [x] scala code part
- [ ] java code part
- [ ] python code part
- [x] [examples] `RecoverableNetworkWordCount.scala`
- [ ] [examples] `JavaRecoverableNetworkWordCount.java`
- [ ] [examples] `recoverable_network_wordcount.py`
## How was this patch tested?
Ran the examples and verified results manually.
Author: Liwei Lin <lwlin7@gmail.com>
Closes #12981 from lw-lin/accumulatorV2-examples.
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r-- | docs/streaming-programming-guide.md | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 6550fcc052..78ae6a7407 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1395,13 +1395,13 @@ object WordBlacklist { object DroppedWordsCounter { - @volatile private var instance: Accumulator[Long] = null + @volatile private var instance: LongAccumulator = null - def getInstance(sc: SparkContext): Accumulator[Long] = { + def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { - instance = sc.accumulator(0L, "WordsInBlacklistCounter") + instance = sc.longAccumulator("WordsInBlacklistCounter") } } } @@ -1409,7 +1409,7 @@ object DroppedWordsCounter { } } -wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { +wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator @@ -1417,12 +1417,12 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { - droppedWordsCounter += count + droppedWordsCounter.add(count) false } else { true } - }.collect() + }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts }) |