aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-08-14 13:42:53 -0700
committerAndrew Or <andrew@databricks.com>2015-08-14 13:42:53 -0700
commit6518ef63037aa56b541927f99ad26744f91098ce (patch)
tree15add9896b651d7d852b28b05f3495740ab7564d /core
parent33bae585d4cb25aed2ac32e0d1248f78cc65318b (diff)
downloadspark-6518ef63037aa56b541927f99ad26744f91098ce.tar.gz
spark-6518ef63037aa56b541927f99ad26744f91098ce.tar.bz2
spark-6518ef63037aa56b541927f99ad26744f91098ce.zip
[SPARK-9948] Fix flaky AccumulatorSuite - internal accumulators
In these tests, we use a custom listener and we assert on fields in the stage / task completion events. However, these events are posted in a separate thread so they're not guaranteed to be posted in time. This commit fixes this flakiness through a job end registration callback. Author: Andrew Or <andrew@databricks.com> Closes #8176 from andrewor14/fix-accumulator-suite.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala153
1 files changed, 92 insertions, 61 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 0eb2293a9d..5b84acf40b 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -182,26 +182,30 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
sc = new SparkContext("local", "test")
sc.addSparkListener(listener)
// Have each task add 1 to the internal accumulator
- sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
+ val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
iter
- }.count()
- val stageInfos = listener.getCompletedStageInfos
- val taskInfos = listener.getCompletedTaskInfos
- assert(stageInfos.size === 1)
- assert(taskInfos.size === numPartitions)
- // The accumulator values should be merged in the stage
- val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
- assert(stageAccum.value.toLong === numPartitions)
- // The accumulator should be updated locally on each task
- val taskAccumValues = taskInfos.map { taskInfo =>
- val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
- assert(taskAccum.update.isDefined)
- assert(taskAccum.update.get.toLong === 1)
- taskAccum.value.toLong
}
- // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
- assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { _ =>
+ val stageInfos = listener.getCompletedStageInfos
+ val taskInfos = listener.getCompletedTaskInfos
+ assert(stageInfos.size === 1)
+ assert(taskInfos.size === numPartitions)
+ // The accumulator values should be merged in the stage
+ val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
+ assert(stageAccum.value.toLong === numPartitions)
+ // The accumulator should be updated locally on each task
+ val taskAccumValues = taskInfos.map { taskInfo =>
+ val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
+ assert(taskAccum.update.isDefined)
+ assert(taskAccum.update.get.toLong === 1)
+ taskAccum.value.toLong
+ }
+ // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
+ assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ }
+ rdd.count()
}
test("internal accumulators in multiple stages") {
@@ -211,7 +215,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
sc.addSparkListener(listener)
// Each stage creates its own set of internal accumulators so the
// values for the same metric should not be mixed up across stages
- sc.parallelize(1 to 100, numPartitions)
+ val rdd = sc.parallelize(1 to 100, numPartitions)
.map { i => (i, i) }
.mapPartitions { iter =>
TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
@@ -227,16 +231,20 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 100
iter
}
- .count()
- // We ran 3 stages, and the accumulator values should be distinct
- val stageInfos = listener.getCompletedStageInfos
- assert(stageInfos.size === 3)
- val firstStageAccum = findAccumulableInfo(stageInfos(0).accumulables.values, TEST_ACCUMULATOR)
- val secondStageAccum = findAccumulableInfo(stageInfos(1).accumulables.values, TEST_ACCUMULATOR)
- val thirdStageAccum = findAccumulableInfo(stageInfos(2).accumulables.values, TEST_ACCUMULATOR)
- assert(firstStageAccum.value.toLong === numPartitions)
- assert(secondStageAccum.value.toLong === numPartitions * 10)
- assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100)
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { _ =>
+ // We ran 3 stages, and the accumulator values should be distinct
+ val stageInfos = listener.getCompletedStageInfos
+ assert(stageInfos.size === 3)
+ val (firstStageAccum, secondStageAccum, thirdStageAccum) =
+ (findAccumulableInfo(stageInfos(0).accumulables.values, TEST_ACCUMULATOR),
+ findAccumulableInfo(stageInfos(1).accumulables.values, TEST_ACCUMULATOR),
+ findAccumulableInfo(stageInfos(2).accumulables.values, TEST_ACCUMULATOR))
+ assert(firstStageAccum.value.toLong === numPartitions)
+ assert(secondStageAccum.value.toLong === numPartitions * 10)
+ assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100)
+ }
+ rdd.count()
}
test("internal accumulators in fully resubmitted stages") {
@@ -268,7 +276,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
// This says use 1 core and retry tasks up to 2 times
sc = new SparkContext("local[1, 2]", "test")
sc.addSparkListener(listener)
- sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, iter) =>
+ val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, iter) =>
val taskContext = TaskContext.get()
taskContext.internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
// Fail the first attempts of a subset of the tasks
@@ -276,28 +284,32 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
throw new Exception("Failing a task intentionally.")
}
iter
- }.count()
- val stageInfos = listener.getCompletedStageInfos
- val taskInfos = listener.getCompletedTaskInfos
- assert(stageInfos.size === 1)
- assert(taskInfos.size === numPartitions + numFailedPartitions)
- val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
- // We should not double count values in the merged accumulator
- assert(stageAccum.value.toLong === numPartitions)
- val taskAccumValues = taskInfos.flatMap { taskInfo =>
- if (!taskInfo.failed) {
- // If a task succeeded, its update value should always be 1
- val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
- assert(taskAccum.update.isDefined)
- assert(taskAccum.update.get.toLong === 1)
- Some(taskAccum.value.toLong)
- } else {
- // If a task failed, we should not get its accumulator values
- assert(taskInfo.accumulables.isEmpty)
- None
+ }
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { _ =>
+ val stageInfos = listener.getCompletedStageInfos
+ val taskInfos = listener.getCompletedTaskInfos
+ assert(stageInfos.size === 1)
+ assert(taskInfos.size === numPartitions + numFailedPartitions)
+ val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
+ // We should not double count values in the merged accumulator
+ assert(stageAccum.value.toLong === numPartitions)
+ val taskAccumValues = taskInfos.flatMap { taskInfo =>
+ if (!taskInfo.failed) {
+ // If a task succeeded, its update value should always be 1
+ val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
+ assert(taskAccum.update.isDefined)
+ assert(taskAccum.update.get.toLong === 1)
+ Some(taskAccum.value.toLong)
+ } else {
+ // If a task failed, we should not get its accumulator values
+ assert(taskInfo.accumulables.isEmpty)
+ None
+ }
}
+ assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
}
- assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ rdd.count()
}
}
@@ -313,20 +325,27 @@ private[spark] object AccumulatorSuite {
testName: String)(testBody: => Unit): Unit = {
val listener = new SaveInfoListener
sc.addSparkListener(listener)
- // Verify that the accumulator does not already exist
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { jobId =>
+ if (jobId == 0) {
+ // The first job is a dummy one to verify that the accumulator does not already exist
+ val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
+ assert(!accums.exists(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY))
+ } else {
+ // In the subsequent jobs, verify that peak execution memory is updated
+ val accum = listener.getCompletedStageInfos
+ .flatMap(_.accumulables.values)
+ .find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
+ .getOrElse {
+ throw new TestFailedException(
+ s"peak execution memory accumulator not set in '$testName'", 0)
+ }
+ assert(accum.value.toLong > 0)
+ }
+ }
+ // Run the jobs
sc.parallelize(1 to 10).count()
- val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
- assert(!accums.exists(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY))
testBody
- // Verify that peak execution memory is updated
- val accum = listener.getCompletedStageInfos
- .flatMap(_.accumulables.values)
- .find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
- .getOrElse {
- throw new TestFailedException(
- s"peak execution memory accumulator not set in '$testName'", 0)
- }
- assert(accum.value.toLong > 0)
}
}
@@ -336,10 +355,22 @@ private[spark] object AccumulatorSuite {
private class SaveInfoListener extends SparkListener {
private val completedStageInfos: ArrayBuffer[StageInfo] = new ArrayBuffer[StageInfo]
private val completedTaskInfos: ArrayBuffer[TaskInfo] = new ArrayBuffer[TaskInfo]
+ private var jobCompletionCallback: (Int => Unit) = null // parameter is job ID
def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq
def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.toArray.toSeq
+ /** Register a callback to be called on job end. */
+ def registerJobCompletionCallback(callback: (Int => Unit)): Unit = {
+ jobCompletionCallback = callback
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ if (jobCompletionCallback != null) {
+ jobCompletionCallback(jobEnd.jobId)
+ }
+ }
+
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
completedStageInfos += stageCompleted.stageInfo
}