aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala24
1 files changed, 12 insertions, 12 deletions
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index b074b95424..e4474bb813 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.executor.TaskMetrics
@@ -29,7 +30,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
override def afterEach(): Unit = {
try {
- Accumulators.clear()
+ AccumulatorContext.clear()
} finally {
super.afterEach()
}
@@ -37,9 +38,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
test("internal accumulators in TaskContext") {
val taskContext = TaskContext.empty()
- val accumUpdates = taskContext.taskMetrics.accumulatorUpdates()
+ val accumUpdates = taskContext.taskMetrics.accumulators()
assert(accumUpdates.size > 0)
- assert(accumUpdates.forall(_.internal))
val testAccum = taskContext.taskMetrics.testAccum.get
assert(accumUpdates.exists(_.id == testAccum.id))
}
@@ -51,7 +51,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
sc.addSparkListener(listener)
// Have each task add 1 to the internal accumulator
val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 1
+ TaskContext.get().taskMetrics().testAccum.get.add(1)
iter
}
// Register asserts in job completion callback to avoid flakiness
@@ -87,17 +87,17 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
val rdd = sc.parallelize(1 to 100, numPartitions)
.map { i => (i, i) }
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 1
+ TaskContext.get().taskMetrics().testAccum.get.add(1)
iter
}
.reduceByKey { case (x, y) => x + y }
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 10
+ TaskContext.get().taskMetrics().testAccum.get.add(10)
iter
}
.repartition(numPartitions * 2)
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 100
+ TaskContext.get().taskMetrics().testAccum.get.add(100)
iter
}
// Register asserts in job completion callback to avoid flakiness
@@ -127,7 +127,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
// This should retry both stages in the scheduler. Note that we only want to fail the
// first stage attempt because we want the stage to eventually succeed.
val x = sc.parallelize(1 to 100, numPartitions)
- .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get += 1; iter }
+ .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get.add(1); iter }
.groupBy(identity)
val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId
val rdd = x.mapPartitionsWithIndex { case (i, iter) =>
@@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
private val myCleaner = new SaveAccumContextCleaner(this)
override def cleaner: Option[ContextCleaner] = Some(myCleaner)
}
- assert(Accumulators.originals.isEmpty)
+ assert(AccumulatorContext.originals.isEmpty)
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
val numInternalAccums = TaskMetrics.empty.internalAccums.length
// We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
- assert(Accumulators.originals.size === numInternalAccums * 2)
+ assert(AccumulatorContext.originals.size === numInternalAccums * 2)
val accumsRegistered = sc.cleaner match {
case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup
case _ => Seq.empty[Long]
}
// Make sure the same set of accumulators is registered for cleanup
assert(accumsRegistered.size === numInternalAccums * 2)
- assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet)
+ assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala)
}
/**
@@ -212,7 +212,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) {
private val accumsRegistered = new ArrayBuffer[Long]
- override def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
+ override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = {
accumsRegistered += a.id
super.registerAccumulatorForCleanup(a)
}