diff options
author | Sean Owen <sowen@cloudera.com> | 2016-06-25 12:14:14 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-06-25 12:14:14 +0100 |
commit | e87741589a24821b5fe73e5d9ee2164247998580 (patch) | |
tree | 741d210a004397e4b6726a6af66a828c751dee7f | |
parent | 3ee9695d1fcf3750cbf7896a56f8a1ba93f4e82f (diff) | |
download | spark-e87741589a24821b5fe73e5d9ee2164247998580.tar.gz spark-e87741589a24821b5fe73e5d9ee2164247998580.tar.bz2 spark-e87741589a24821b5fe73e5d9ee2164247998580.zip |
[SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests
## What changes were proposed in this pull request?
Make spill tests wait until job has completed before returning the number of stages that spilled
## How was this patch tested?
Existing Jenkins tests.
Author: Sean Owen <sowen@cloudera.com>
Closes #13896 from srowen/SPARK-16193.
-rw-r--r-- | core/src/main/scala/org/apache/spark/TestUtils.scala | 13 |
1 files changed, 12 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 43c89b258f..871b9d1ad5 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -22,6 +22,7 @@ import java.net.{URI, URL} import java.nio.charset.StandardCharsets import java.nio.file.Paths import java.util.Arrays +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConverters._ @@ -190,8 +191,14 @@ private[spark] object TestUtils { private class SpillListener extends SparkListener { private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]] private val spilledStageIds = new mutable.HashSet[Int] + private val stagesDone = new CountDownLatch(1) - def numSpilledStages: Int = spilledStageIds.size + def numSpilledStages: Int = { + // Long timeout, just in case somehow the job end isn't notified. + // Fails if a timeout occurs + assert(stagesDone.await(10, TimeUnit.SECONDS)) + spilledStageIds.size + } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { stageIdToTaskMetrics.getOrElseUpdate( @@ -206,4 +213,8 @@ private class SpillListener extends SparkListener { spilledStageIds += stageId } } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + stagesDone.countDown() + } } |