diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-19 20:56:15 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-19 20:56:15 -0800 |
commit | 05bc02e80be78d83937bf57f726946e297d0dd08 (patch) | |
tree | fe660dc7f10944b178bb1f28a67598f86a636f1f | |
parent | 8a992226bd6289cfc11c417e0a17edff7d4a4a87 (diff) | |
parent | 092c631fa8da6381b814f4d262c884ba08629b39 (diff) | |
download | spark-05bc02e80be78d83937bf57f726946e297d0dd08.tar.gz spark-05bc02e80be78d83937bf57f726946e297d0dd08.tar.bz2 spark-05bc02e80be78d83937bf57f726946e297d0dd08.zip |
Merge pull request #482 from woggling/shutdown-exceptions
Don't call System.exit over uncaught exceptions from shutdown hooks
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 21 | ||||
-rw-r--r-- | core/src/main/scala/spark/executor/Executor.scala | 17 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/DiskStore.scala | 6 |
3 files changed, 37 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 28d643abca..81daacf958 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -454,4 +454,25 @@ private object Utils extends Logging { def clone[T](value: T, serializer: SerializerInstance): T = { serializer.deserialize[T](serializer.serialize(value)) } + + /** + * Detect whether this thread might be executing a shutdown hook. Will always return true if + * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g. + * if System.exit was just called by a concurrent thread). + * + * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing + * an IllegalStateException. + */ + def inShutdown(): Boolean = { + try { + val hook = new Thread { + override def run() {} + } + Runtime.getRuntime.addShutdownHook(hook) + Runtime.getRuntime.removeShutdownHook(hook) + } catch { + case ise: IllegalStateException => return true + } + return false + } } diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index bd21ba719a..5de09030aa 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -50,14 +50,19 @@ private[spark] class Executor extends Logging { override def uncaughtException(thread: Thread, exception: Throwable) { try { logError("Uncaught exception in thread " + thread, exception) - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } } } catch { - case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM) - case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } } } diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 7e5b820cbb..ddbf8821ad 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -178,7 +178,11 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { override def run() { logDebug("Shutdown hook called") - localDirs.foreach(localDir => Utils.deleteRecursively(localDir)) + try { + localDirs.foreach(localDir => Utils.deleteRecursively(localDir)) + } catch { + case t: Throwable => logError("Exception while deleting local spark dirs", t) + } } }) } |