aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala28
2 files changed, 19 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 545fa453b7..cbac4c13ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
}
- // Exposed for testing
- @volatile private[spark] var stopCalled = false
-
/**
* Start sending events to attached listeners.
*
@@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
def stop() {
- stopCalled = true
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 4cdccdda6f..36511a9e95 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
test("bus.stop() waits for the event queue to completely drain") {
@volatile var drained = false
+ // When Listener has started
+ val listenerStarted = new Semaphore(0)
+
// Tells the listener to stop blocking
- val listenerWait = new Semaphore(1)
+ val listenerWait = new Semaphore(0)
+
+ // When stopper has started
+ val stopperStarted = new Semaphore(0)
- // When stop has returned
- val stopReturned = new Semaphore(1)
+ // When stopper has returned
+ val stopperReturned = new Semaphore(0)
class BlockingListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
+ listenerStarted.release()
listenerWait.acquire()
drained = true
}
@@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
bus.start()
bus.post(SparkListenerJobEnd(0, JobSucceeded))
- // the queue should not drain immediately
+ listenerStarted.acquire()
+ // Listener should be blocked after start
assert(!drained)
new Thread("ListenerBusStopper") {
override def run() {
+ stopperStarted.release()
// stop() will block until notify() is called below
bus.stop()
- stopReturned.release(1)
+ stopperReturned.release()
}
}.start()
- while (!bus.stopCalled) {
- Thread.sleep(10)
- }
+ stopperStarted.acquire()
+ // Listener should remain blocked after stopper started
+ assert(!drained)
+ // unblock Listener to let queue drain
listenerWait.release()
- stopReturned.acquire()
+ stopperReturned.acquire()
assert(drained)
}