diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala | 9 |
1 files changed, 3 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 61b5a4cecd..b8481eabc7 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri processingEvent = true } try { - val event = eventQueue.poll - if (event == null) { + if (stopped.get()) { // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } return } + val event = eventQueue.poll + assert(event != null, "event queue was empty but the listener bus was not stopped") postToAll(event) } finally { self.synchronized { |