diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2015-04-21 20:33:57 -0400 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-04-21 20:33:57 -0400 |
commit | e72c16e30d85cdc394d318b5551698885cfda9b8 (patch) | |
tree | 9b450a7f27b311e5bd5b776e8aee2af96e3408d3 /core | |
parent | b063a61b9852cf9b9d2c905332d2ecb2fd716cc4 (diff) | |
download | spark-e72c16e30d85cdc394d318b5551698885cfda9b8.tar.gz spark-e72c16e30d85cdc394d318b5551698885cfda9b8.tar.bz2 spark-e72c16e30d85cdc394d318b5551698885cfda9b8.zip |
[SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
This change adds some new utility code to handle shutdown hooks in
Spark. The main goal is to take advantage of Hadoop 2.x's API for
shutdown hooks, which allows Spark to register a hook that will
run before the one that cleans up HDFS clients, and thus avoids
some races that would cause exceptions to show up and other issues
such as failure to properly close event logs.
Unfortunately, Hadoop 1.x does not have such APIs, so in that case
correctness is still left to chance.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #5560 from vanzin/SPARK-6014 and squashes the following commits:
edfafb1 [Marcelo Vanzin] Better scaladoc.
fcaeedd [Marcelo Vanzin] Merge branch 'master' into SPARK-6014
e7039dc [Marcelo Vanzin] [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
Diffstat (limited to 'core')
6 files changed, 164 insertions, 64 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 72f6048239..56bef57e55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.SignalLogger +import org.apache.spark.util.{SignalLogger, Utils} /** * A web server that renders SparkUIs of completed applications. @@ -194,9 +194,7 @@ object HistoryServer extends Logging { val server = new HistoryServer(conf, provider, securityManager, port) server.bind() - Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") { - override def run(): Unit = server.stop() - }) + Utils.addShutdownHook { () => server.stop() } // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 7d5acabb95..7aa85b732f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -28,6 +28,7 @@ import com.google.common.io.Files import org.apache.spark.{SparkConf, Logging} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged +import org.apache.spark.util.Utils import org.apache.spark.util.logging.FileAppender /** @@ -61,7 +62,7 @@ private[deploy] class ExecutorRunner( // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might // make sense to remove this in the future. - private var shutdownHook: Thread = null + private var shutdownHook: AnyRef = null private[worker] def start() { workerThread = new Thread("ExecutorRunner for " + fullId) { @@ -69,12 +70,7 @@ private[deploy] class ExecutorRunner( } workerThread.start() // Shutdown hook that kills actors on shutdown. - shutdownHook = new Thread() { - override def run() { - killProcess(Some("Worker shutting down")) - } - } - Runtime.getRuntime.addShutdownHook(shutdownHook) + shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) } } /** @@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner( workerThread = null state = ExecutorState.KILLED try { - Runtime.getRuntime.removeShutdownHook(shutdownHook) + Utils.removeShutdownHook(shutdownHook) } catch { case e: IllegalStateException => None } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 2883137872..7ea5e54f9e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon } } - private def addShutdownHook(): Thread = { - val shutdownHook = new Thread("delete Spark local dirs") { - override def run(): Unit = Utils.logUncaughtExceptions { - logDebug("Shutdown hook called") - DiskBlockManager.this.doStop() - } + private def addShutdownHook(): AnyRef = { + Utils.addShutdownHook { () => + logDebug("Shutdown hook called") + DiskBlockManager.this.doStop() } - Runtime.getRuntime.addShutdownHook(shutdownHook) - shutdownHook } /** Cleanup local dirs and stop shuffle sender. */ private[spark] def stop() { // Remove the shutdown hook. It causes memory leaks if we leave it around. - try { - Runtime.getRuntime.removeShutdownHook(shutdownHook) - } catch { - case e: IllegalStateException => None - } + Utils.removeShutdownHook(shutdownHook) doStop() } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index af87303421..951897cead 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager( private def addShutdownHook() { tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir)) - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") { - override def run(): Unit = Utils.logUncaughtExceptions { - logDebug("Shutdown hook called") - tachyonDirs.foreach { tachyonDir => - try { - if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) { - Utils.deleteRecursively(tachyonDir, client) - } - } catch { - case e: Exception => - logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) + Utils.addShutdownHook { () => + logDebug("Shutdown hook called") + tachyonDirs.foreach { tachyonDir => + try { + if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) { + Utils.deleteRecursively(tachyonDir, client) } + } catch { + case e: Exception => + logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) } - client.close() } - }) + client.close() + } } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1029b0f9fc..7b0de1ae55 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -21,7 +21,7 @@ import java.io._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer -import java.util.{Properties, Locale, Random, UUID} +import java.util.{PriorityQueue, Properties, Locale, Random, UUID} import java.util.concurrent._ import javax.net.ssl.HttpsURLConnection @@ -30,7 +30,7 @@ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag -import scala.util.Try +import scala.util.{Failure, Success, Try} import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.{ByteStreams, Files} @@ -64,9 +64,15 @@ private[spark] object CallSite { private[spark] object Utils extends Logging { val random = new Random() + val DEFAULT_SHUTDOWN_PRIORITY = 100 + private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @volatile private var localRootDirs: Array[String] = null + + private val shutdownHooks = new SparkShutdownHookManager() + shutdownHooks.install() + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -176,18 +182,16 @@ private[spark] object Utils extends Logging { private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]() // Add a shutdown hook to delete the temp dirs when the JVM exits - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") { - override def run(): Unit = Utils.logUncaughtExceptions { - logDebug("Shutdown hook called") - shutdownDeletePaths.foreach { dirPath => - try { - Utils.deleteRecursively(new File(dirPath)) - } catch { - case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e) - } + addShutdownHook { () => + logDebug("Shutdown hook called") + shutdownDeletePaths.foreach { dirPath => + try { + Utils.deleteRecursively(new File(dirPath)) + } catch { + case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e) } } - }) + } // Register the path to be deleted via shutdown hook def registerShutdownDeleteDir(file: File) { @@ -613,7 +617,7 @@ private[spark] object Utils extends Logging { } Utils.setupSecureURLConnection(uc, securityMgr) - val timeoutMs = + val timeoutMs = conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000 uc.setConnectTimeout(timeoutMs) uc.setReadTimeout(timeoutMs) @@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging { /** * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the * default UncaughtExceptionHandler - * + * * NOTE: This method is to be called by the spark-started JVM process. */ def tryOrExit(block: => Unit) { @@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging { } /** - * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught + * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught * exception - * - * NOTE: This method is to be called by the driver-side components to avoid stopping the - * user-started JVM process completely; in contrast, tryOrExit is to be called in the + * + * NOTE: This method is to be called by the driver-side components to avoid stopping the + * user-started JVM process completely; in contrast, tryOrExit is to be called in the * spark-started JVM process . */ def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) { @@ -2132,6 +2136,102 @@ private[spark] object Utils extends Logging { .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) } + /** + * Adds a shutdown hook with default priority. + * + * @param hook The code to run during shutdown. + * @return A handle that can be used to unregister the shutdown hook. + */ + def addShutdownHook(hook: () => Unit): AnyRef = { + addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook) + } + + /** + * Adds a shutdown hook with the given priority. Hooks with lower priority values run + * first. + * + * @param hook The code to run during shutdown. + * @return A handle that can be used to unregister the shutdown hook. + */ + def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = { + shutdownHooks.add(priority, hook) + } + + /** + * Remove a previously installed shutdown hook. + * + * @param ref A handle returned by `addShutdownHook`. + * @return Whether the hook was removed. + */ + def removeShutdownHook(ref: AnyRef): Boolean = { + shutdownHooks.remove(ref) + } + +} + +private [util] class SparkShutdownHookManager { + + private val hooks = new PriorityQueue[SparkShutdownHook]() + private var shuttingDown = false + + /** + * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not + * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for + * the best. + */ + def install(): Unit = { + val hookTask = new Runnable() { + override def run(): Unit = runAll() + } + Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match { + case Success(shmClass) => + val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get() + .asInstanceOf[Int] + val shm = shmClass.getMethod("get").invoke(null) + shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int]) + .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30)) + + case Failure(_) => + Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook")); + } + } + + def runAll(): Unit = synchronized { + shuttingDown = true + while (!hooks.isEmpty()) { + Utils.logUncaughtExceptions(hooks.poll().run()) + } + } + + def add(priority: Int, hook: () => Unit): AnyRef = synchronized { + checkState() + val hookRef = new SparkShutdownHook(priority, hook) + hooks.add(hookRef) + hookRef + } + + def remove(ref: AnyRef): Boolean = synchronized { + checkState() + hooks.remove(ref) + } + + private def checkState(): Unit = { + if (shuttingDown) { + throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.") + } + } + +} + +private class SparkShutdownHook(private val priority: Int, hook: () => Unit) + extends Comparable[SparkShutdownHook] { + + override def compareTo(other: SparkShutdownHook): Int = { + other.priority - priority + } + + def run(): Unit = hook() + } /** diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index fb97e650ff..1ba99803f5 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.util -import scala.util.Random - import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} import java.text.DecimalFormatSymbols import java.util.concurrent.TimeUnit import java.util.Locale +import java.util.PriorityQueue + +import scala.collection.mutable.ListBuffer +import scala.util.Random import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files @@ -36,14 +38,14 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf class UtilsSuite extends FunSuite with ResetSystemProperties { - + test("timeConversion") { // Test -1 assert(Utils.timeStringAsSeconds("-1") === -1) - + // Test zero assert(Utils.timeStringAsSeconds("0") === 0) - + assert(Utils.timeStringAsSeconds("1") === 1) assert(Utils.timeStringAsSeconds("1s") === 1) assert(Utils.timeStringAsSeconds("1000ms") === 1) @@ -52,7 +54,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1)) assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1)) assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1)) - + assert(Utils.timeStringAsMs("1") === 1) assert(Utils.timeStringAsMs("1ms") === 1) assert(Utils.timeStringAsMs("1000us") === 1) @@ -61,7 +63,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1)) assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1)) assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1)) - + // Test invalid strings intercept[NumberFormatException] { Utils.timeStringAsMs("This breaks 600s") @@ -79,7 +81,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { Utils.timeStringAsMs("This 123s breaks") } } - + test("bytesToString") { assert(Utils.bytesToString(10) === "10.0 B") assert(Utils.bytesToString(1500) === "1500.0 B") @@ -466,4 +468,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { val newFileName = new File(testFileDir, testFileName) assert(newFileName.isFile()) } + + test("shutdown hook manager") { + val manager = new SparkShutdownHookManager() + val output = new ListBuffer[Int]() + + val hook1 = manager.add(1, () => output += 1) + manager.add(3, () => output += 3) + manager.add(2, () => output += 2) + manager.add(4, () => output += 4) + manager.remove(hook1) + + manager.runAll() + assert(output.toList === List(4, 3, 2)) + } } |