aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-10-09 15:07:53 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2013-10-09 15:07:53 -0700
commita34a4e8174b5f285a327d7ff30ac9f3ff0db7689 (patch)
tree9b5b40b91ddd3b78bcad35facb5cf4e858daeed2 /core/src/test
parent3218fa795ff3ddee855772184aebe99098701d4f (diff)
downloadspark-a34a4e8174b5f285a327d7ff30ac9f3ff0db7689.tar.gz
spark-a34a4e8174b5f285a327d7ff30ac9f3ff0db7689.tar.bz2
spark-a34a4e8174b5f285a327d7ff30ac9f3ff0db7689.zip
Fix race condition in SparkListenerSuite (fixes SPARK-908).
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala15
1 files changed, 5 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 41a161e08a..6e80262366 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -23,15 +23,9 @@ import scala.collection.mutable
import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext._
-/**
- *
- */
-
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
- // TODO: This test has a race condition since the DAGScheduler now reports results
- // asynchronously. It needs to be updated for that patch.
- ignore("local metrics") {
+ test("local metrics") {
sc = new SparkContext("local[4]", "test")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
@@ -45,7 +39,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
d.count
- Thread.sleep(1000)
+ val WAIT_TIMEOUT_MILLIS = 10000
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (1)
val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
@@ -57,7 +52,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
d4.collectAsMap
- Thread.sleep(1000)
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (4)
listener.stageInfos.foreach {stageInfo =>
//small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
@@ -68,7 +63,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime")
}
- stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
+ stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
taskMetrics.shuffleWriteMetrics should be ('defined)