path: root/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/AccumulatorSuite.scala')
1 files changed, 159 insertions, 165 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 5b84acf40b..11c97d7d9a 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -17,18 +17,22 @@
package org.apache.spark
+import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.ref.WeakReference
+import scala.util.control.NonFatal
import org.scalatest.Matchers
import org.scalatest.exceptions.TestFailedException
import org.apache.spark.scheduler._
+import org.apache.spark.serializer.JavaSerializer
class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext {
- import InternalAccumulator._
+ import AccumulatorParam._
implicit def setAccum[A]: AccumulableParam[mutable.Set[A], A] =
new AccumulableParam[mutable.Set[A], A] {
@@ -59,7 +63,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
longAcc.value should be (210L + maxInt * 20)
- test ("value not assignable from tasks") {
+ test("value not assignable from tasks") {
sc = new SparkContext("local", "test")
val acc : Accumulator[Int] = sc.accumulator(0)
@@ -84,7 +88,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
- test ("value not readable in tasks") {
+ test("value not readable in tasks") {
val maxI = 1000
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -159,193 +163,157 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
- test("internal accumulators in TaskContext") {
+ test("get accum") {
sc = new SparkContext("local", "test")
- val accums = InternalAccumulator.create(sc)
- val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
- val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
- val collectedInternalAccums = taskContext.collectInternalAccumulators()
- val collectedAccums = taskContext.collectAccumulators()
- assert(internalMetricsToAccums.size > 0)
- assert(internalMetricsToAccums.values.forall(_.isInternal))
- assert(internalMetricsToAccums.contains(TEST_ACCUMULATOR))
- val testAccum = internalMetricsToAccums(TEST_ACCUMULATOR)
- assert(collectedInternalAccums.size === internalMetricsToAccums.size)
- assert(collectedInternalAccums.size === collectedAccums.size)
- assert(collectedInternalAccums.contains(testAccum.id))
- assert(collectedAccums.contains(testAccum.id))
- }
+ // Don't register with SparkContext for cleanup
+ var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ val accId = acc.id
+ val ref = WeakReference(acc)
+ assert(ref.get.isDefined)
+ Accumulators.register(ref.get.get)
- test("internal accumulators in a stage") {
- val listener = new SaveInfoListener
- val numPartitions = 10
- sc = new SparkContext("local", "test")
- sc.addSparkListener(listener)
- // Have each task add 1 to the internal accumulator
- val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
- iter
- }
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { _ =>
- val stageInfos = listener.getCompletedStageInfos
- val taskInfos = listener.getCompletedTaskInfos
- assert(stageInfos.size === 1)
- assert(taskInfos.size === numPartitions)
- // The accumulator values should be merged in the stage
- val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
- assert(stageAccum.value.toLong === numPartitions)
- // The accumulator should be updated locally on each task
- val taskAccumValues = taskInfos.map { taskInfo =>
- val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
- assert(taskAccum.update.isDefined)
- assert(taskAccum.update.get.toLong === 1)
- taskAccum.value.toLong
- }
- // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
- assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ // Remove the explicit reference to it and allow weak reference to get garbage collected
+ acc = null
+ System.gc()
+ assert(ref.get.isEmpty)
+ // Getting a garbage collected accum should throw error
+ intercept[IllegalAccessError] {
+ Accumulators.get(accId)
- rdd.count()
+ // Getting a normal accumulator. Note: this has to be separate because referencing an
+ // accumulator above in an `assert` would keep it from being garbage collected.
+ val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true, true)
+ Accumulators.register(acc2)
+ assert(Accumulators.get(acc2.id) === Some(acc2))
+ // Getting an accumulator that does not exist should return None
+ assert(Accumulators.get(100000).isEmpty)
- test("internal accumulators in multiple stages") {
- val listener = new SaveInfoListener
- val numPartitions = 10
- sc = new SparkContext("local", "test")
- sc.addSparkListener(listener)
- // Each stage creates its own set of internal accumulators so the
- // values for the same metric should not be mixed up across stages
- val rdd = sc.parallelize(1 to 100, numPartitions)
- .map { i => (i, i) }
- .mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
- iter
- }
- .reduceByKey { case (x, y) => x + y }
- .mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 10
- iter
- }
- .repartition(numPartitions * 2)
- .mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 100
- iter
- }
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { _ =>
- // We ran 3 stages, and the accumulator values should be distinct
- val stageInfos = listener.getCompletedStageInfos
- assert(stageInfos.size === 3)
- val (firstStageAccum, secondStageAccum, thirdStageAccum) =
- (findAccumulableInfo(stageInfos(0).accumulables.values, TEST_ACCUMULATOR),
- findAccumulableInfo(stageInfos(1).accumulables.values, TEST_ACCUMULATOR),
- findAccumulableInfo(stageInfos(2).accumulables.values, TEST_ACCUMULATOR))
- assert(firstStageAccum.value.toLong === numPartitions)
- assert(secondStageAccum.value.toLong === numPartitions * 10)
- assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100)
- }
- rdd.count()
+ test("only external accums are automatically registered") {
+ val accEx = new Accumulator(0, IntAccumulatorParam, Some("external"), internal = false)
+ val accIn = new Accumulator(0, IntAccumulatorParam, Some("internal"), internal = true)
+ assert(!accEx.isInternal)
+ assert(accIn.isInternal)
+ assert(Accumulators.get(accEx.id).isDefined)
+ assert(Accumulators.get(accIn.id).isEmpty)
- test("internal accumulators in fully resubmitted stages") {
- testInternalAccumulatorsWithFailedTasks((i: Int) => true) // fail all tasks
+ test("copy") {
+ val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), true, false)
+ val acc2 = acc1.copy()
+ assert(acc1.id === acc2.id)
+ assert(acc1.value === acc2.value)
+ assert(acc1.name === acc2.name)
+ assert(acc1.isInternal === acc2.isInternal)
+ assert(acc1.countFailedValues === acc2.countFailedValues)
+ assert(acc1 !== acc2)
+ // Modifying one does not affect the other
+ acc1.add(44L)
+ assert(acc1.value === 500L)
+ assert(acc2.value === 456L)
+ acc2.add(144L)
+ assert(acc1.value === 500L)
+ assert(acc2.value === 600L)
- test("internal accumulators in partially resubmitted stages") {
- testInternalAccumulatorsWithFailedTasks((i: Int) => i % 2 == 0) // fail a subset
+ test("register multiple accums with same ID") {
+ // Make sure these are internal accums so we don't automatically register them already
+ val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ val acc2 = acc1.copy()
+ assert(acc1 !== acc2)
+ assert(acc1.id === acc2.id)
+ assert(Accumulators.originals.isEmpty)
+ assert(Accumulators.get(acc1.id).isEmpty)
+ Accumulators.register(acc1)
+ Accumulators.register(acc2)
+ // The second one does not override the first one
+ assert(Accumulators.originals.size === 1)
+ assert(Accumulators.get(acc1.id) === Some(acc1))
- /**
- * Return the accumulable info that matches the specified name.
- */
- private def findAccumulableInfo(
- accums: Iterable[AccumulableInfo],
- name: String): AccumulableInfo = {
- accums.find { a => a.name == name }.getOrElse {
- throw new TestFailedException(s"internal accumulator '$name' not found", 0)
- }
+ test("string accumulator param") {
+ val acc = new Accumulator("", StringAccumulatorParam, Some("darkness"))
+ assert(acc.value === "")
+ acc.setValue("feeds")
+ assert(acc.value === "feeds")
+ acc.add("your")
+ assert(acc.value === "your") // value is overwritten, not concatenated
+ acc += "soul"
+ assert(acc.value === "soul")
+ acc ++= "with"
+ assert(acc.value === "with")
+ acc.merge("kindness")
+ assert(acc.value === "kindness")
- /**
- * Test whether internal accumulators are merged properly if some tasks fail.
- */
- private def testInternalAccumulatorsWithFailedTasks(failCondition: (Int => Boolean)): Unit = {
- val listener = new SaveInfoListener
- val numPartitions = 10
- val numFailedPartitions = (0 until numPartitions).count(failCondition)
- // This says use 1 core and retry tasks up to 2 times
- sc = new SparkContext("local[1, 2]", "test")
- sc.addSparkListener(listener)
- val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, iter) =>
- val taskContext = TaskContext.get()
- taskContext.internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
- // Fail the first attempts of a subset of the tasks
- if (failCondition(i) && taskContext.attemptNumber() == 0) {
- throw new Exception("Failing a task intentionally.")
- }
- iter
- }
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { _ =>
- val stageInfos = listener.getCompletedStageInfos
- val taskInfos = listener.getCompletedTaskInfos
- assert(stageInfos.size === 1)
- assert(taskInfos.size === numPartitions + numFailedPartitions)
- val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
- // We should not double count values in the merged accumulator
- assert(stageAccum.value.toLong === numPartitions)
- val taskAccumValues = taskInfos.flatMap { taskInfo =>
- if (!taskInfo.failed) {
- // If a task succeeded, its update value should always be 1
- val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
- assert(taskAccum.update.isDefined)
- assert(taskAccum.update.get.toLong === 1)
- Some(taskAccum.value.toLong)
- } else {
- // If a task failed, we should not get its accumulator values
- assert(taskInfo.accumulables.isEmpty)
- None
- }
- }
- assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
- }
- rdd.count()
+ test("list accumulator param") {
+ val acc = new Accumulator(Seq.empty[Int], new ListAccumulatorParam[Int], Some("numbers"))
+ assert(acc.value === Seq.empty[Int])
+ acc.add(Seq(1, 2))
+ assert(acc.value === Seq(1, 2))
+ acc += Seq(3, 4)
+ assert(acc.value === Seq(1, 2, 3, 4))
+ acc ++= Seq(5, 6)
+ assert(acc.value === Seq(1, 2, 3, 4, 5, 6))
+ acc.merge(Seq(7, 8))
+ assert(acc.value === Seq(1, 2, 3, 4, 5, 6, 7, 8))
+ acc.setValue(Seq(9, 10))
+ assert(acc.value === Seq(9, 10))
+ }
+ test("value is reset on the executors") {
+ val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
+ val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
+ val externalAccums = Seq(acc1, acc2)
+ val internalAccums = InternalAccumulator.create()
+ // Set some values; these should not be observed later on the "executors"
+ acc1.setValue(10)
+ acc2.setValue(20L)
+ internalAccums
+ .find(_.name == Some(InternalAccumulator.TEST_ACCUM))
+ .get.asInstanceOf[Accumulator[Long]]
+ .setValue(30L)
+ // Simulate the task being serialized and sent to the executors.
+ val dummyTask = new DummyTask(internalAccums, externalAccums)
+ val serInstance = new JavaSerializer(new SparkConf).newInstance()
+ val taskSer = Task.serializeWithDependencies(
+ dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance)
+ // Now we're on the executors.
+ // Deserialize the task and assert that its accumulators are zero'ed out.
+ val (_, _, taskBytes) = Task.deserializeWithDependencies(taskSer)
+ val taskDeser = serInstance.deserialize[DummyTask](
+ taskBytes, Thread.currentThread.getContextClassLoader)
+ // Assert that executors see only zeros
+ taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) }
+ taskDeser.internalAccums.foreach { a => assert(a.localValue == a.zero) }
private[spark] object AccumulatorSuite {
+ import InternalAccumulator._
- * Run one or more Spark jobs and verify that the peak execution memory accumulator
- * is updated afterwards.
+ * Run one or more Spark jobs and verify that in at least one job the peak execution memory
+ * accumulator is updated afterwards.
def verifyPeakExecutionMemorySet(
sc: SparkContext,
testName: String)(testBody: => Unit): Unit = {
val listener = new SaveInfoListener
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { jobId =>
- if (jobId == 0) {
- // The first job is a dummy one to verify that the accumulator does not already exist
- val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
- assert(!accums.exists(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY))
- } else {
- // In the subsequent jobs, verify that peak execution memory is updated
- val accum = listener.getCompletedStageInfos
- .flatMap(_.accumulables.values)
- .find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
- .getOrElse {
- throw new TestFailedException(
- s"peak execution memory accumulator not set in '$testName'", 0)
- }
- assert(accum.value.toLong > 0)
- }
- }
- // Run the jobs
- sc.parallelize(1 to 10).count()
+ val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
+ val isSet = accums.exists { a =>
+ a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L)
+ }
+ if (!isSet) {
+ throw new TestFailedException(s"peak execution memory accumulator not set in '$testName'", 0)
+ }
@@ -357,6 +325,10 @@ private class SaveInfoListener extends SparkListener {
private val completedTaskInfos: ArrayBuffer[TaskInfo] = new ArrayBuffer[TaskInfo]
private var jobCompletionCallback: (Int => Unit) = null // parameter is job ID
+ // Accesses must be synchronized to ensure failures in `jobCompletionCallback` are propagated
+ @GuardedBy("this")
+ private var exception: Throwable = null
def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq
def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.toArray.toSeq
@@ -365,9 +337,20 @@ private class SaveInfoListener extends SparkListener {
jobCompletionCallback = callback
- override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ /** Throw a stored exception, if any. */
+ def maybeThrowException(): Unit = synchronized {
+ if (exception != null) { throw exception }
+ }
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
if (jobCompletionCallback != null) {
- jobCompletionCallback(jobEnd.jobId)
+ try {
+ jobCompletionCallback(jobEnd.jobId)
+ } catch {
+ // Store any exception thrown here so we can throw them later in the main thread.
+ // Otherwise, if `jobCompletionCallback` threw something it wouldn't fail the test.
+ case NonFatal(e) => exception = e
+ }
@@ -379,3 +362,14 @@ private class SaveInfoListener extends SparkListener {
completedTaskInfos += taskEnd.taskInfo
+ * A dummy [[Task]] that contains internal and external [[Accumulator]]s.
+ */
+private[spark] class DummyTask(
+ val internalAccums: Seq[Accumulator[_]],
+ val externalAccums: Seq[Accumulator[_]])
+ extends Task[Int](0, 0, 0, internalAccums) {
+ override def runTask(c: TaskContext): Int = 1