aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-04-16 17:39:11 -0700
committerReynold Xin <rxin@apache.org>2014-04-16 17:39:11 -0700
commit38877ccf394a50bfd37c8433d4aafaa91683d3b8 (patch)
treeb69ecaaa6448b52b72954962bca2e0c145364ffe /core
parent016a87764a7eb1092b6489e5f411d9e67c56e027 (diff)
downloadspark-38877ccf394a50bfd37c8433d4aafaa91683d3b8.tar.gz
spark-38877ccf394a50bfd37c8433d4aafaa91683d3b8.tar.bz2
spark-38877ccf394a50bfd37c8433d4aafaa91683d3b8.zip
Fixing a race condition in event listener unit test
Author: Kan Zhang <kzhang@apache.org> Closes #401 from kanzhang/fix-1475 and squashes the following commits: c6058bd [Kan Zhang] Fixing a race condition in event listener unit test
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala28
2 files changed, 19 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 545fa453b7..cbac4c13ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
}
- // Exposed for testing
- @volatile private[spark] var stopCalled = false
-
/**
* Start sending events to attached listeners.
*
@@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
def stop() {
- stopCalled = true
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 4cdccdda6f..36511a9e95 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
test("bus.stop() waits for the event queue to completely drain") {
@volatile var drained = false
+ // When Listener has started
+ val listenerStarted = new Semaphore(0)
+
// Tells the listener to stop blocking
- val listenerWait = new Semaphore(1)
+ val listenerWait = new Semaphore(0)
+
+ // When stopper has started
+ val stopperStarted = new Semaphore(0)
- // When stop has returned
- val stopReturned = new Semaphore(1)
+ // When stopper has returned
+ val stopperReturned = new Semaphore(0)
class BlockingListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
+ listenerStarted.release()
listenerWait.acquire()
drained = true
}
@@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
bus.start()
bus.post(SparkListenerJobEnd(0, JobSucceeded))
- // the queue should not drain immediately
+ listenerStarted.acquire()
+ // Listener should be blocked after start
assert(!drained)
new Thread("ListenerBusStopper") {
override def run() {
+ stopperStarted.release()
// stop() will block until notify() is called below
bus.stop()
- stopReturned.release(1)
+ stopperReturned.release()
}
}.start()
- while (!bus.stopCalled) {
- Thread.sleep(10)
- }
+ stopperStarted.acquire()
+ // Listener should remain blocked after stopper started
+ assert(!drained)
+ // unblock Listener to let queue drain
listenerWait.release()
- stopReturned.acquire()
+ stopperReturned.acquire()
assert(drained)
}