aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala9
1 files changed, 3 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 61b5a4cecd..b8481eabc7 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
processingEvent = true
}
try {
- val event = eventQueue.poll
- if (event == null) {
+ if (stopped.get()) {
// Get out of the while loop and shutdown the daemon thread
- if (!stopped.get) {
- throw new IllegalStateException("Polling `null` from eventQueue means" +
- " the listener bus has been stopped. So `stopped` must be true")
- }
return
}
+ val event = eventQueue.poll
+ assert(event != null, "event queue was empty but the listener bus was not stopped")
postToAll(event)
} finally {
self.synchronized {