aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/streaming-programming-guide.md12
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala3
2 files changed, 7 insertions, 8 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 6550fcc052..78ae6a7407 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1395,13 +1395,13 @@ object WordBlacklist {
object DroppedWordsCounter {
- @volatile private var instance: Accumulator[Long] = null
+ @volatile private var instance: LongAccumulator = null
- def getInstance(sc: SparkContext): Accumulator[Long] = {
+ def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
- instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+ instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
@@ -1409,7 +1409,7 @@ object DroppedWordsCounter {
}
}
-wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
@@ -1417,12 +1417,12 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
- droppedWordsCounter += count
+ droppedWordsCounter.add(count)
false
} else {
true
}
- }.collect()
+ }.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index acbcb0c4b7..49c0427321 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -27,8 +27,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-import org.apache.spark.util.IntParam
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{IntParam, LongAccumulator}
/**
* Use this singleton to get or register a Broadcast variable.