diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index b074b95424..e4474bb813 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.executor.TaskMetrics @@ -29,7 +30,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { override def afterEach(): Unit = { try { - Accumulators.clear() + AccumulatorContext.clear() } finally { super.afterEach() } @@ -37,9 +38,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { test("internal accumulators in TaskContext") { val taskContext = TaskContext.empty() - val accumUpdates = taskContext.taskMetrics.accumulatorUpdates() + val accumUpdates = taskContext.taskMetrics.accumulators() assert(accumUpdates.size > 0) - assert(accumUpdates.forall(_.internal)) val testAccum = taskContext.taskMetrics.testAccum.get assert(accumUpdates.exists(_.id == testAccum.id)) } @@ -51,7 +51,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { sc.addSparkListener(listener) // Have each task add 1 to the internal accumulator val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 1 + TaskContext.get().taskMetrics().testAccum.get.add(1) iter } // Register asserts in job completion callback to avoid flakiness @@ -87,17 +87,17 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { val rdd = sc.parallelize(1 to 100, numPartitions) .map { i => (i, i) } .mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 1 + TaskContext.get().taskMetrics().testAccum.get.add(1) iter } .reduceByKey { case (x, y) => x + y } .mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 10 + TaskContext.get().taskMetrics().testAccum.get.add(10) iter } .repartition(numPartitions * 2) .mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 100 + TaskContext.get().taskMetrics().testAccum.get.add(100) iter } // Register asserts in job completion callback to avoid flakiness @@ -127,7 +127,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { // This should retry both stages in the scheduler. Note that we only want to fail the // first stage attempt because we want the stage to eventually succeed. val x = sc.parallelize(1 to 100, numPartitions) - .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get += 1; iter } + .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get.add(1); iter } .groupBy(identity) val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId val rdd = x.mapPartitionsWithIndex { case (i, iter) => @@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { private val myCleaner = new SaveAccumContextCleaner(this) override def cleaner: Option[ContextCleaner] = Some(myCleaner) } - assert(Accumulators.originals.isEmpty) + assert(AccumulatorContext.originals.isEmpty) sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count() val numInternalAccums = TaskMetrics.empty.internalAccums.length // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage - assert(Accumulators.originals.size === numInternalAccums * 2) + assert(AccumulatorContext.originals.size === numInternalAccums * 2) val accumsRegistered = sc.cleaner match { case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup case _ => Seq.empty[Long] } // Make sure the same set of accumulators is registered for cleanup assert(accumsRegistered.size === numInternalAccums * 2) - assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet) + assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala) } /** @@ -212,7 +212,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) { private val accumsRegistered = new ArrayBuffer[Long] - override def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = { + override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = { accumsRegistered += a.id super.registerAccumulatorForCleanup(a) } |