From 2f9047b5eb969e0198b8a73e392642ca852ba786 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 18 May 2016 11:48:46 +0100 Subject: [SPARK-15322][MLLIB][CORE][SQL] update deprecate accumulator usage into accumulatorV2 in spark project ## What changes were proposed in this pull request? I use Intellj-IDEA to search usage of deprecate SparkContext.accumulator in the whole spark project, and update the code.(except those test code for accumulator method itself) ## How was this patch tested? Exisiting unit tests Author: WeichenXu Closes #13112 from WeichenXu123/update_accuV2_in_mllib. --- mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala | 7 ++++--- .../src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 4 ++-- .../src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala | 8 ++++---- 3 files changed, 10 insertions(+), 9 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala b/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala index 8d4174124b..e79b1f3164 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala @@ -19,7 +19,8 @@ package org.apache.spark.ml.util import scala.collection.mutable -import org.apache.spark.{Accumulator, SparkContext} +import org.apache.spark.SparkContext +import org.apache.spark.util.LongAccumulator; /** * Abstract class for stopwatches. @@ -102,12 +103,12 @@ private[spark] class DistributedStopwatch( sc: SparkContext, override val name: String) extends Stopwatch { - private val elapsedTime: Accumulator[Long] = sc.accumulator(0L, s"DistributedStopwatch($name)") + private val elapsedTime: LongAccumulator = sc.longAccumulator(s"DistributedStopwatch($name)") override def elapsed(): Long = elapsedTime.value override protected def add(duration: Long): Unit = { - elapsedTime += duration + elapsedTime.add(duration) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 60f13d27d0..38728f2693 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -279,7 +279,7 @@ class KMeans private ( } val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.accumulator(0.0)) + val costAccums = activeRuns.map(_ => sc.doubleAccumulator) val bcActiveCenters = sc.broadcast(activeCenters) @@ -296,7 +296,7 @@ class KMeans private ( points.foreach { point => (0 until runs).foreach { i => val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) - costAccums(i) += cost + costAccums(i).add(cost) val sum = sums(i)(bestCenter) axpy(1.0, point.vector, sum) counts(i)(bestCenter) += 1 diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala index 9e6bc7193c..141249a427 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala @@ -60,9 +60,9 @@ class StopwatchSuite extends SparkFunSuite with MLlibTestSparkContext { test("DistributedStopwatch on executors") { val sw = new DistributedStopwatch(sc, "sw") val rdd = sc.parallelize(0 until 4, 4) - val acc = sc.accumulator(0L) + val acc = sc.longAccumulator rdd.foreach { i => - acc += checkStopwatch(sw) + acc.add(checkStopwatch(sw)) } assert(!sw.isRunning) val elapsed = sw.elapsed() @@ -88,12 +88,12 @@ class StopwatchSuite extends SparkFunSuite with MLlibTestSparkContext { assert(sw.toString === s"{\n local: ${localElapsed}ms,\n spark: ${sparkElapsed}ms\n}") val rdd = sc.parallelize(0 until 4, 4) - val acc = sc.accumulator(0L) + val acc = sc.longAccumulator rdd.foreach { i => sw("local").start() val duration = checkStopwatch(sw("spark")) sw("local").stop() - acc += duration + acc.add(duration) } val localElapsed2 = sw("local").elapsed() assert(localElapsed2 === localElapsed) -- cgit v1.2.3