aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-programming-guide.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r--docs/streaming-programming-guide.md12
1 files changed, 6 insertions, 6 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 6550fcc052..78ae6a7407 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1395,13 +1395,13 @@ object WordBlacklist {
object DroppedWordsCounter {
- @volatile private var instance: Accumulator[Long] = null
+ @volatile private var instance: LongAccumulator = null
- def getInstance(sc: SparkContext): Accumulator[Long] = {
+ def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
- instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+ instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
@@ -1409,7 +1409,7 @@ object DroppedWordsCounter {
}
}
-wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
@@ -1417,12 +1417,12 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
- droppedWordsCounter += count
+ droppedWordsCounter.add(count)
false
} else {
true
}
- }.collect()
+ }.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})