aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-29 17:59:41 -0800
committerAndrew Or <andrew@databricks.com>2016-01-29 17:59:41 -0800
commite6ceac49a311faf3413acda57a6612fe806adf90 (patch)
tree9126d3b443120907002e796d9bc35ffb2d737026
parent2cbc412821641cf9446c0621ffa1976bd7fc4fa1 (diff)
downloadspark-e6ceac49a311faf3413acda57a6612fe806adf90.tar.gz
spark-e6ceac49a311faf3413acda57a6612fe806adf90.tar.bz2
spark-e6ceac49a311faf3413acda57a6612fe806adf90.zip
[SPARK-13096][TEST] Fix flaky verifyPeakExecutionMemorySet
Previously we would assert things before all events are guaranteed to have been processed. To fix this, just block until all events are actually processed, i.e. until the listener queue is empty. https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/79/testReport/junit/org.apache.spark.util.collection/ExternalAppendOnlyMapSuite/spilling/ Author: Andrew Or <andrew@databricks.com> Closes #10990 from andrewor14/accum-suite-less-flaky.
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala2
1 files changed, 2 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 11c97d7d9a..b8f2b96d70 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -307,6 +307,8 @@ private[spark] object AccumulatorSuite {
val listener = new SaveInfoListener
sc.addSparkListener(listener)
testBody
+ // wait until all events have been processed before proceeding to assert things
+ sc.listenerBus.waitUntilEmpty(10 * 1000)
val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
val isSet = accums.exists { a =>
a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L)