aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-10-09 15:07:53 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2013-10-09 15:07:53 -0700
commita34a4e8174b5f285a327d7ff30ac9f3ff0db7689 (patch)
tree9b5b40b91ddd3b78bcad35facb5cf4e858daeed2 /core
parent3218fa795ff3ddee855772184aebe99098701d4f (diff)
downloadspark-a34a4e8174b5f285a327d7ff30ac9f3ff0db7689.tar.gz
spark-a34a4e8174b5f285a327d7ff30ac9f3ff0db7689.tar.bz2
spark-a34a4e8174b5f285a327d7ff30ac9f3ff0db7689.zip
Fix race condition in SparkListenerSuite (fixes SPARK-908).
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala15
4 files changed, 25 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index febcf9c6ee..ff45e76105 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -231,7 +231,7 @@ class SparkContext(
}
taskScheduler.start()
- @volatile private var dagScheduler = new DAGScheduler(taskScheduler)
+ @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
ui.start()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4053b91134..5c40f5095a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -114,7 +114,7 @@ class DAGScheduler(
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
- private val listenerBus = new SparkListenerBus()
+ private[spark] val listenerBus = new SparkListenerBus()
// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index a65e1ecd6d..8283c4b392 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -70,5 +70,23 @@ private[spark] class SparkListenerBus() extends Logging {
queueFullErrorMessageLogged = true
}
}
+
+ /** Waits until there are no more events in the queue, or until the specified time has elapsed.
+ *
+ * Used for testing only. Returns true if the queue has emptied and false is the specified time
+ * elapsed before the queue emptied.
+ */
+ def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+ val finishTime = System.currentTimeMillis + timeoutMillis
+ while (!eventQueue.isEmpty()) {
+ if (System.currentTimeMillis > finishTime) {
+ return false
+ }
+ /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+ * add overhead in the general case. */
+ Thread.sleep(10)
+ }
+ return true
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 41a161e08a..6e80262366 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -23,15 +23,9 @@ import scala.collection.mutable
import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext._
-/**
- *
- */
-
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
- // TODO: This test has a race condition since the DAGScheduler now reports results
- // asynchronously. It needs to be updated for that patch.
- ignore("local metrics") {
+ test("local metrics") {
sc = new SparkContext("local[4]", "test")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
@@ -45,7 +39,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
d.count
- Thread.sleep(1000)
+ val WAIT_TIMEOUT_MILLIS = 10000
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (1)
val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
@@ -57,7 +52,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
d4.collectAsMap
- Thread.sleep(1000)
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (4)
listener.stageInfos.foreach {stageInfo =>
//small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
@@ -68,7 +63,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime")
}
- stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
+ stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
taskMetrics.shuffleWriteMetrics should be ('defined)