aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2016-05-18 11:48:46 +0100
committerSean Owen <sowen@cloudera.com>2016-05-18 11:48:46 +0100
commit2f9047b5eb969e0198b8a73e392642ca852ba786 (patch)
tree152fe58ada0fa73a5a5e151b4d0ce188c65be0b5 /mllib
parent33814f887aea339c99e14ce7f14ca6fcc6875015 (diff)
downloadspark-2f9047b5eb969e0198b8a73e392642ca852ba786.tar.gz
spark-2f9047b5eb969e0198b8a73e392642ca852ba786.tar.bz2
spark-2f9047b5eb969e0198b8a73e392642ca852ba786.zip
[SPARK-15322][MLLIB][CORE][SQL] update deprecate accumulator usage into accumulatorV2 in spark project
## What changes were proposed in this pull request? I use Intellj-IDEA to search usage of deprecate SparkContext.accumulator in the whole spark project, and update the code.(except those test code for accumulator method itself) ## How was this patch tested? Exisiting unit tests Author: WeichenXu <WeichenXu123@outlook.com> Closes #13112 from WeichenXu123/update_accuV2_in_mllib.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala7
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala8
3 files changed, 10 insertions, 9 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala b/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala
index 8d4174124b..e79b1f3164 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala
@@ -19,7 +19,8 @@ package org.apache.spark.ml.util
import scala.collection.mutable
-import org.apache.spark.{Accumulator, SparkContext}
+import org.apache.spark.SparkContext
+import org.apache.spark.util.LongAccumulator;
/**
* Abstract class for stopwatches.
@@ -102,12 +103,12 @@ private[spark] class DistributedStopwatch(
sc: SparkContext,
override val name: String) extends Stopwatch {
- private val elapsedTime: Accumulator[Long] = sc.accumulator(0L, s"DistributedStopwatch($name)")
+ private val elapsedTime: LongAccumulator = sc.longAccumulator(s"DistributedStopwatch($name)")
override def elapsed(): Long = elapsedTime.value
override protected def add(duration: Long): Unit = {
- elapsedTime += duration
+ elapsedTime.add(duration)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 60f13d27d0..38728f2693 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -279,7 +279,7 @@ class KMeans private (
}
val activeCenters = activeRuns.map(r => centers(r)).toArray
- val costAccums = activeRuns.map(_ => sc.accumulator(0.0))
+ val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
val bcActiveCenters = sc.broadcast(activeCenters)
@@ -296,7 +296,7 @@ class KMeans private (
points.foreach { point =>
(0 until runs).foreach { i =>
val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)
- costAccums(i) += cost
+ costAccums(i).add(cost)
val sum = sums(i)(bestCenter)
axpy(1.0, point.vector, sum)
counts(i)(bestCenter) += 1
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala
index 9e6bc7193c..141249a427 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala
@@ -60,9 +60,9 @@ class StopwatchSuite extends SparkFunSuite with MLlibTestSparkContext {
test("DistributedStopwatch on executors") {
val sw = new DistributedStopwatch(sc, "sw")
val rdd = sc.parallelize(0 until 4, 4)
- val acc = sc.accumulator(0L)
+ val acc = sc.longAccumulator
rdd.foreach { i =>
- acc += checkStopwatch(sw)
+ acc.add(checkStopwatch(sw))
}
assert(!sw.isRunning)
val elapsed = sw.elapsed()
@@ -88,12 +88,12 @@ class StopwatchSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(sw.toString ===
s"{\n local: ${localElapsed}ms,\n spark: ${sparkElapsed}ms\n}")
val rdd = sc.parallelize(0 until 4, 4)
- val acc = sc.accumulator(0L)
+ val acc = sc.longAccumulator
rdd.foreach { i =>
sw("local").start()
val duration = checkStopwatch(sw("spark"))
sw("local").stop()
- acc += duration
+ acc.add(duration)
}
val localElapsed2 = sw("local").elapsed()
assert(localElapsed2 === localElapsed)