aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2016-05-18 11:48:46 +0100
committerSean Owen <sowen@cloudera.com>2016-05-18 11:48:46 +0100
commit2f9047b5eb969e0198b8a73e392642ca852ba786 (patch)
tree152fe58ada0fa73a5a5e151b4d0ce188c65be0b5 /examples
parent33814f887aea339c99e14ce7f14ca6fcc6875015 (diff)
downloadspark-2f9047b5eb969e0198b8a73e392642ca852ba786.tar.gz
spark-2f9047b5eb969e0198b8a73e392642ca852ba786.tar.bz2
spark-2f9047b5eb969e0198b8a73e392642ca852ba786.zip
[SPARK-15322][MLLIB][CORE][SQL] update deprecate accumulator usage into accumulatorV2 in spark project
## What changes were proposed in this pull request? I use Intellj-IDEA to search usage of deprecate SparkContext.accumulator in the whole spark project, and update the code.(except those test code for accumulator method itself) ## How was this patch tested? Exisiting unit tests Author: WeichenXu <WeichenXu123@outlook.com> Closes #13112 from WeichenXu123/update_accuV2_in_mllib.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala11
1 files changed, 6 insertions, 5 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 1bcd85e1d5..acbcb0c4b7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -23,11 +23,12 @@ import java.nio.charset.Charset
import com.google.common.io.Files
-import org.apache.spark.{Accumulator, SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.util.IntParam
+import org.apache.spark.util.LongAccumulator
/**
* Use this singleton to get or register a Broadcast variable.
@@ -54,13 +55,13 @@ object WordBlacklist {
*/
object DroppedWordsCounter {
- @volatile private var instance: Accumulator[Long] = null
+ @volatile private var instance: LongAccumulator = null
- def getInstance(sc: SparkContext): Accumulator[Long] = {
+ def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
- instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+ instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
@@ -124,7 +125,7 @@ object RecoverableNetworkWordCount {
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
- droppedWordsCounter += count
+ droppedWordsCounter.add(count)
false
} else {
true