aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortedyu <yuzhihong@gmail.com>2015-11-10 16:51:25 -0800
committerAndrew Or <andrew@databricks.com>2015-11-10 16:51:25 -0800
commit3e0a6cf1e02a19b37c68d3026415d53bb57a576b (patch)
tree63335331c0a478b055f0b1848907689e7ae34ab4
parent33112f9c48680c33d663978f76806ebf0ea39789 (diff)
downloadspark-3e0a6cf1e02a19b37c68d3026415d53bb57a576b.tar.gz
spark-3e0a6cf1e02a19b37c68d3026415d53bb57a576b.tar.bz2
spark-3e0a6cf1e02a19b37c68d3026415d53bb57a576b.zip
[SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called
As vonnagy reported in the following thread: http://search-hadoop.com/m/q3RTtk982kvIow22 Attempts to join the thread in AsynchronousListenerBus resulted in lock up because AsynchronousListenerBus thread was still getting messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler Author: tedyu <yuzhihong@gmail.com> Closes #9546 from ted-yu/master.
-rw-r--r--core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala9
1 files changed, 3 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 61b5a4cecd..b8481eabc7 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
processingEvent = true
}
try {
- val event = eventQueue.poll
- if (event == null) {
+ if (stopped.get()) {
// Get out of the while loop and shutdown the daemon thread
- if (!stopped.get) {
- throw new IllegalStateException("Polling `null` from eventQueue means" +
- " the listener bus has been stopped. So `stopped` must be true")
- }
return
}
+ val event = eventQueue.poll
+ assert(event != null, "event queue was empty but the listener bus was not stopped")
postToAll(event)
} finally {
self.synchronized {