aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-11-15 22:38:30 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-11-15 22:38:30 -0800
commitfd50fa4c3eff42e8adeeabe399ddba0edac930c8 (patch)
tree551342245be432b8fdc6288519e919aedf4b3213 /core
parentb58765caa6d7e6933050565c5d423c45e7e70ba6 (diff)
downloadspark-fd50fa4c3eff42e8adeeabe399ddba0edac930c8.tar.gz
spark-fd50fa4c3eff42e8adeeabe399ddba0edac930c8.tar.bz2
spark-fd50fa4c3eff42e8adeeabe399ddba0edac930c8.zip
Revert "[SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called"
This reverts commit 3e0a6cf1e02a19b37c68d3026415d53bb57a576b.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index b3b54af972..c20627b056 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -66,12 +66,15 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
processingEvent = true
}
try {
- if (stopped.get()) {
+ val event = eventQueue.poll
+ if (event == null) {
// Get out of the while loop and shutdown the daemon thread
+ if (!stopped.get) {
+ throw new IllegalStateException("Polling `null` from eventQueue means" +
+ " the listener bus has been stopped. So `stopped` must be true")
+ }
return
}
- val event = eventQueue.poll
- assert(event != null, "event queue was empty but the listener bus was not stopped")
postToAll(event)
} finally {
self.synchronized {