aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-11-30 17:33:09 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-30 17:33:09 -0800
commit96bf468c7860be317c20ccacf259910968d2dc83 (patch)
treeaaf47d866ec4ffc1d986f8ebd23aca50111df569 /core
parent9bf2120672ae0f620a217ccd96bef189ab75e0d6 (diff)
downloadspark-96bf468c7860be317c20ccacf259910968d2dc83.tar.gz
spark-96bf468c7860be317c20ccacf259910968d2dc83.tar.bz2
spark-96bf468c7860be317c20ccacf259910968d2dc83.zip
[SPARK-12049][CORE] User JVM shutdown hook can cause deadlock at shutdown
Avoid potential deadlock with a user app's shutdown hook thread by more narrowly synchronizing access to 'hooks' Author: Sean Owen <sowen@cloudera.com> Closes #10042 from srowen/SPARK-12049.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala33
1 files changed, 16 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 4012dca3ec..620f226a23 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -206,7 +206,7 @@ private[spark] object ShutdownHookManager extends Logging {
private [util] class SparkShutdownHookManager {
private val hooks = new PriorityQueue[SparkShutdownHook]()
- private var shuttingDown = false
+ @volatile private var shuttingDown = false
/**
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
@@ -232,28 +232,27 @@ private [util] class SparkShutdownHookManager {
}
}
- def runAll(): Unit = synchronized {
+ def runAll(): Unit = {
shuttingDown = true
- while (!hooks.isEmpty()) {
- Try(Utils.logUncaughtExceptions(hooks.poll().run()))
+ var nextHook: SparkShutdownHook = null
+ while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
+ Try(Utils.logUncaughtExceptions(nextHook.run()))
}
}
- def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
- checkState()
- val hookRef = new SparkShutdownHook(priority, hook)
- hooks.add(hookRef)
- hookRef
- }
-
- def remove(ref: AnyRef): Boolean = synchronized {
- hooks.remove(ref)
+ def add(priority: Int, hook: () => Unit): AnyRef = {
+ hooks.synchronized {
+ if (shuttingDown) {
+ throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
+ }
+ val hookRef = new SparkShutdownHook(priority, hook)
+ hooks.add(hookRef)
+ hookRef
+ }
}
- private def checkState(): Unit = {
- if (shuttingDown) {
- throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
- }
+ def remove(ref: AnyRef): Boolean = {
+ hooks.synchronized { hooks.remove(ref) }
}
}