aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-06-25 12:14:14 +0100
committerSean Owen <sowen@cloudera.com>2016-06-25 12:14:14 +0100
commite87741589a24821b5fe73e5d9ee2164247998580 (patch)
tree741d210a004397e4b6726a6af66a828c751dee7f /core
parent3ee9695d1fcf3750cbf7896a56f8a1ba93f4e82f (diff)
downloadspark-e87741589a24821b5fe73e5d9ee2164247998580.tar.gz
spark-e87741589a24821b5fe73e5d9ee2164247998580.tar.bz2
spark-e87741589a24821b5fe73e5d9ee2164247998580.zip
[SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests
## What changes were proposed in this pull request? Make spill tests wait until job has completed before returning the number of stages that spilled ## How was this patch tested? Existing Jenkins tests. Author: Sean Owen <sowen@cloudera.com> Closes #13896 from srowen/SPARK-16193.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala13
1 files changed, 12 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 43c89b258f..871b9d1ad5 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -22,6 +22,7 @@ import java.net.{URI, URL}
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.Arrays
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream}
import scala.collection.JavaConverters._
@@ -190,8 +191,14 @@ private[spark] object TestUtils {
private class SpillListener extends SparkListener {
private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]]
private val spilledStageIds = new mutable.HashSet[Int]
+ private val stagesDone = new CountDownLatch(1)
- def numSpilledStages: Int = spilledStageIds.size
+ def numSpilledStages: Int = {
+ // Long timeout, just in case somehow the job end isn't notified.
+ // Fails if a timeout occurs
+ assert(stagesDone.await(10, TimeUnit.SECONDS))
+ spilledStageIds.size
+ }
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
stageIdToTaskMetrics.getOrElseUpdate(
@@ -206,4 +213,8 @@ private class SpillListener extends SparkListener {
spilledStageIds += stageId
}
}
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ stagesDone.countDown()
+ }
}