aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-21 20:33:57 -0400
committerSean Owen <sowen@cloudera.com>2015-04-21 20:33:57 -0400
commite72c16e30d85cdc394d318b5551698885cfda9b8 (patch)
tree9b450a7f27b311e5bd5b776e8aee2af96e3408d3 /core
parentb063a61b9852cf9b9d2c905332d2ecb2fd716cc4 (diff)
downloadspark-e72c16e30d85cdc394d318b5551698885cfda9b8.tar.gz
spark-e72c16e30d85cdc394d318b5551698885cfda9b8.tar.bz2
spark-e72c16e30d85cdc394d318b5551698885cfda9b8.zip
[SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
This change adds some new utility code to handle shutdown hooks in Spark. The main goal is to take advantage of Hadoop 2.x's API for shutdown hooks, which allows Spark to register a hook that will run before the one that cleans up HDFS clients, and thus avoids some races that would cause exceptions to show up and other issues such as failure to properly close event logs. Unfortunately, Hadoop 1.x does not have such APIs, so in that case correctness is still left to chance. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5560 from vanzin/SPARK-6014 and squashes the following commits: edfafb1 [Marcelo Vanzin] Better scaladoc. fcaeedd [Marcelo Vanzin] Merge branch 'master' into SPARK-6014 e7039dc [Marcelo Vanzin] [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala136
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala32
6 files changed, 164 insertions, 64 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 72f6048239..56bef57e55 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.SignalLogger
+import org.apache.spark.util.{SignalLogger, Utils}
/**
* A web server that renders SparkUIs of completed applications.
@@ -194,9 +194,7 @@ object HistoryServer extends Logging {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
- Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
- override def run(): Unit = server.stop()
- })
+ Utils.addShutdownHook { () => server.stop() }
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7d5acabb95..7aa85b732f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -28,6 +28,7 @@ import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.Utils
import org.apache.spark.util.logging.FileAppender
/**
@@ -61,7 +62,7 @@ private[deploy] class ExecutorRunner(
// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
- private var shutdownHook: Thread = null
+ private var shutdownHook: AnyRef = null
private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
@@ -69,12 +70,7 @@ private[deploy] class ExecutorRunner(
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
- shutdownHook = new Thread() {
- override def run() {
- killProcess(Some("Worker shutting down"))
- }
- }
- Runtime.getRuntime.addShutdownHook(shutdownHook)
+ shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
}
/**
@@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner(
workerThread = null
state = ExecutorState.KILLED
try {
- Runtime.getRuntime.removeShutdownHook(shutdownHook)
+ Utils.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 2883137872..7ea5e54f9e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
}
- private def addShutdownHook(): Thread = {
- val shutdownHook = new Thread("delete Spark local dirs") {
- override def run(): Unit = Utils.logUncaughtExceptions {
- logDebug("Shutdown hook called")
- DiskBlockManager.this.doStop()
- }
+ private def addShutdownHook(): AnyRef = {
+ Utils.addShutdownHook { () =>
+ logDebug("Shutdown hook called")
+ DiskBlockManager.this.doStop()
}
- Runtime.getRuntime.addShutdownHook(shutdownHook)
- shutdownHook
}
/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
// Remove the shutdown hook. It causes memory leaks if we leave it around.
- try {
- Runtime.getRuntime.removeShutdownHook(shutdownHook)
- } catch {
- case e: IllegalStateException => None
- }
+ Utils.removeShutdownHook(shutdownHook)
doStop()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index af87303421..951897cead 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager(
private def addShutdownHook() {
tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
- override def run(): Unit = Utils.logUncaughtExceptions {
- logDebug("Shutdown hook called")
- tachyonDirs.foreach { tachyonDir =>
- try {
- if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
- Utils.deleteRecursively(tachyonDir, client)
- }
- } catch {
- case e: Exception =>
- logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
+ Utils.addShutdownHook { () =>
+ logDebug("Shutdown hook called")
+ tachyonDirs.foreach { tachyonDir =>
+ try {
+ if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+ Utils.deleteRecursively(tachyonDir, client)
}
+ } catch {
+ case e: Exception =>
+ logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
- client.close()
}
- })
+ client.close()
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1029b0f9fc..7b0de1ae55 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
-import java.util.{Properties, Locale, Random, UUID}
+import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
@@ -30,7 +30,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.{ByteStreams, Files}
@@ -64,9 +64,15 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()
+ val DEFAULT_SHUTDOWN_PRIORITY = 100
+
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
+
+ private val shutdownHooks = new SparkShutdownHookManager()
+ shutdownHooks.install()
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -176,18 +182,16 @@ private[spark] object Utils extends Logging {
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
// Add a shutdown hook to delete the temp dirs when the JVM exits
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
- override def run(): Unit = Utils.logUncaughtExceptions {
- logDebug("Shutdown hook called")
- shutdownDeletePaths.foreach { dirPath =>
- try {
- Utils.deleteRecursively(new File(dirPath))
- } catch {
- case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
- }
+ addShutdownHook { () =>
+ logDebug("Shutdown hook called")
+ shutdownDeletePaths.foreach { dirPath =>
+ try {
+ Utils.deleteRecursively(new File(dirPath))
+ } catch {
+ case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
}
- })
+ }
// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
@@ -613,7 +617,7 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)
- val timeoutMs =
+ val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
@@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging {
/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
- *
+ *
* NOTE: This method is to be called by the spark-started JVM process.
*/
def tryOrExit(block: => Unit) {
@@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging {
}
/**
- * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
+ * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* exception
- *
- * NOTE: This method is to be called by the driver-side components to avoid stopping the
- * user-started JVM process completely; in contrast, tryOrExit is to be called in the
+ *
+ * NOTE: This method is to be called by the driver-side components to avoid stopping the
+ * user-started JVM process completely; in contrast, tryOrExit is to be called in the
* spark-started JVM process .
*/
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
@@ -2132,6 +2136,102 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}
+ /**
+ * Adds a shutdown hook with default priority.
+ *
+ * @param hook The code to run during shutdown.
+ * @return A handle that can be used to unregister the shutdown hook.
+ */
+ def addShutdownHook(hook: () => Unit): AnyRef = {
+ addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
+ }
+
+ /**
+ * Adds a shutdown hook with the given priority. Hooks with lower priority values run
+ * first.
+ *
+ * @param hook The code to run during shutdown.
+ * @return A handle that can be used to unregister the shutdown hook.
+ */
+ def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
+ shutdownHooks.add(priority, hook)
+ }
+
+ /**
+ * Remove a previously installed shutdown hook.
+ *
+ * @param ref A handle returned by `addShutdownHook`.
+ * @return Whether the hook was removed.
+ */
+ def removeShutdownHook(ref: AnyRef): Boolean = {
+ shutdownHooks.remove(ref)
+ }
+
+}
+
+private [util] class SparkShutdownHookManager {
+
+ private val hooks = new PriorityQueue[SparkShutdownHook]()
+ private var shuttingDown = false
+
+ /**
+ * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
+ * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
+ * the best.
+ */
+ def install(): Unit = {
+ val hookTask = new Runnable() {
+ override def run(): Unit = runAll()
+ }
+ Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
+ case Success(shmClass) =>
+ val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+ .asInstanceOf[Int]
+ val shm = shmClass.getMethod("get").invoke(null)
+ shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
+ .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
+
+ case Failure(_) =>
+ Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
+ }
+ }
+
+ def runAll(): Unit = synchronized {
+ shuttingDown = true
+ while (!hooks.isEmpty()) {
+ Utils.logUncaughtExceptions(hooks.poll().run())
+ }
+ }
+
+ def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
+ checkState()
+ val hookRef = new SparkShutdownHook(priority, hook)
+ hooks.add(hookRef)
+ hookRef
+ }
+
+ def remove(ref: AnyRef): Boolean = synchronized {
+ checkState()
+ hooks.remove(ref)
+ }
+
+ private def checkState(): Unit = {
+ if (shuttingDown) {
+ throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
+ }
+ }
+
+}
+
+private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
+ extends Comparable[SparkShutdownHook] {
+
+ override def compareTo(other: SparkShutdownHook): Int = {
+ other.priority - priority
+ }
+
+ def run(): Unit = hook()
+
}
/**
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index fb97e650ff..1ba99803f5 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -17,14 +17,16 @@
package org.apache.spark.util
-import scala.util.Random
-
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
import java.util.concurrent.TimeUnit
import java.util.Locale
+import java.util.PriorityQueue
+
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
@@ -36,14 +38,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
class UtilsSuite extends FunSuite with ResetSystemProperties {
-
+
test("timeConversion") {
// Test -1
assert(Utils.timeStringAsSeconds("-1") === -1)
-
+
// Test zero
assert(Utils.timeStringAsSeconds("0") === 0)
-
+
assert(Utils.timeStringAsSeconds("1") === 1)
assert(Utils.timeStringAsSeconds("1s") === 1)
assert(Utils.timeStringAsSeconds("1000ms") === 1)
@@ -52,7 +54,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
-
+
assert(Utils.timeStringAsMs("1") === 1)
assert(Utils.timeStringAsMs("1ms") === 1)
assert(Utils.timeStringAsMs("1000us") === 1)
@@ -61,7 +63,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
-
+
// Test invalid strings
intercept[NumberFormatException] {
Utils.timeStringAsMs("This breaks 600s")
@@ -79,7 +81,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
Utils.timeStringAsMs("This 123s breaks")
}
}
-
+
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")
@@ -466,4 +468,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val newFileName = new File(testFileDir, testFileName)
assert(newFileName.isFile())
}
+
+ test("shutdown hook manager") {
+ val manager = new SparkShutdownHookManager()
+ val output = new ListBuffer[Int]()
+
+ val hook1 = manager.add(1, () => output += 1)
+ manager.add(3, () => output += 3)
+ manager.add(2, () => output += 2)
+ manager.add(4, () => output += 4)
+ manager.remove(hook1)
+
+ manager.runAll()
+ assert(output.toList === List(4, 3, 2))
+ }
}