diff options
author | WeichenXu <WeichenXu123@outlook.com> | 2016-05-18 11:48:46 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-05-18 11:48:46 +0100 |
commit | 2f9047b5eb969e0198b8a73e392642ca852ba786 (patch) | |
tree | 152fe58ada0fa73a5a5e151b4d0ce188c65be0b5 /examples | |
parent | 33814f887aea339c99e14ce7f14ca6fcc6875015 (diff) | |
download | spark-2f9047b5eb969e0198b8a73e392642ca852ba786.tar.gz spark-2f9047b5eb969e0198b8a73e392642ca852ba786.tar.bz2 spark-2f9047b5eb969e0198b8a73e392642ca852ba786.zip |
[SPARK-15322][MLLIB][CORE][SQL] update deprecate accumulator usage into accumulatorV2 in spark project
## What changes were proposed in this pull request?
I use Intellj-IDEA to search usage of deprecate SparkContext.accumulator in the whole spark project, and update the code.(except those test code for accumulator method itself)
## How was this patch tested?
Exisiting unit tests
Author: WeichenXu <WeichenXu123@outlook.com>
Closes #13112 from WeichenXu123/update_accuV2_in_mllib.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 1bcd85e1d5..acbcb0c4b7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -23,11 +23,12 @@ import java.nio.charset.Charset import com.google.common.io.Files -import org.apache.spark.{Accumulator, SparkConf, SparkContext} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.spark.util.IntParam +import org.apache.spark.util.LongAccumulator /** * Use this singleton to get or register a Broadcast variable. @@ -54,13 +55,13 @@ object WordBlacklist { */ object DroppedWordsCounter { - @volatile private var instance: Accumulator[Long] = null + @volatile private var instance: LongAccumulator = null - def getInstance(sc: SparkContext): Accumulator[Long] = { + def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { - instance = sc.accumulator(0L, "WordsInBlacklistCounter") + instance = sc.longAccumulator("WordsInBlacklistCounter") } } } @@ -124,7 +125,7 @@ object RecoverableNetworkWordCount { // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { - droppedWordsCounter += count + droppedWordsCounter.add(count) false } else { true |