diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2013-10-09 15:07:53 -0700 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2013-10-09 15:07:53 -0700 |
commit | a34a4e8174b5f285a327d7ff30ac9f3ff0db7689 (patch) | |
tree | 9b5b40b91ddd3b78bcad35facb5cf4e858daeed2 /core | |
parent | 3218fa795ff3ddee855772184aebe99098701d4f (diff) | |
download | spark-a34a4e8174b5f285a327d7ff30ac9f3ff0db7689.tar.gz spark-a34a4e8174b5f285a327d7ff30ac9f3ff0db7689.tar.bz2 spark-a34a4e8174b5f285a327d7ff30ac9f3ff0db7689.zip |
Fix race condition in SparkListenerSuite (fixes SPARK-908).
Diffstat (limited to 'core')
4 files changed, 25 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index febcf9c6ee..ff45e76105 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -231,7 +231,7 @@ class SparkContext( } taskScheduler.start() - @volatile private var dagScheduler = new DAGScheduler(taskScheduler) + @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start() ui.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4053b91134..5c40f5095a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -114,7 +114,7 @@ class DAGScheduler( private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private val listenerBus = new SparkListenerBus() + private[spark] val listenerBus = new SparkListenerBus() // Contains the locations that each RDD's partitions are cached on private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index a65e1ecd6d..8283c4b392 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -70,5 +70,23 @@ private[spark] class SparkListenerBus() extends Logging { queueFullErrorMessageLogged = true
}
}
+
+ /** Waits until there are no more events in the queue, or until the specified time has elapsed.
+ *
+ * Used for testing only. Returns true if the queue has emptied and false is the specified time
+ * elapsed before the queue emptied.
+ */
+ def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+ val finishTime = System.currentTimeMillis + timeoutMillis
+ while (!eventQueue.isEmpty()) {
+ if (System.currentTimeMillis > finishTime) {
+ return false
+ }
+ /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+ * add overhead in the general case. */
+ Thread.sleep(10)
+ }
+ return true
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 41a161e08a..6e80262366 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -23,15 +23,9 @@ import scala.collection.mutable import org.scalatest.matchers.ShouldMatchers import org.apache.spark.SparkContext._ -/** - * - */ - class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { - // TODO: This test has a race condition since the DAGScheduler now reports results - // asynchronously. It needs to be updated for that patch. - ignore("local metrics") { + test("local metrics") { sc = new SparkContext("local[4]", "test") val listener = new SaveStageInfo sc.addSparkListener(listener) @@ -45,7 +39,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} d.count - Thread.sleep(1000) + val WAIT_TIMEOUT_MILLIS = 10000 + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be (1) val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1") @@ -57,7 +52,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc d4.collectAsMap - Thread.sleep(1000) + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be (4) listener.stageInfos.foreach {stageInfo => //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms @@ -68,7 +63,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime") } - stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) => + stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) { taskMetrics.shuffleWriteMetrics should be ('defined) |