aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala136
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala32
6 files changed, 164 insertions, 64 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 72f6048239..56bef57e55 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.SignalLogger
+import org.apache.spark.util.{SignalLogger, Utils}
/**
* A web server that renders SparkUIs of completed applications.
@@ -194,9 +194,7 @@ object HistoryServer extends Logging {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
- Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
- override def run(): Unit = server.stop()
- })
+ Utils.addShutdownHook { () => server.stop() }
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7d5acabb95..7aa85b732f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -28,6 +28,7 @@ import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.Utils
import org.apache.spark.util.logging.FileAppender
/**
@@ -61,7 +62,7 @@ private[deploy] class ExecutorRunner(
// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
- private var shutdownHook: Thread = null
+ private var shutdownHook: AnyRef = null
private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
@@ -69,12 +70,7 @@ private[deploy] class ExecutorRunner(
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
- shutdownHook = new Thread() {
- override def run() {
- killProcess(Some("Worker shutting down"))
- }
- }
- Runtime.getRuntime.addShutdownHook(shutdownHook)
+ shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
}
/**
@@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner(
workerThread = null
state = ExecutorState.KILLED
try {
- Runtime.getRuntime.removeShutdownHook(shutdownHook)
+ Utils.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 2883137872..7ea5e54f9e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
}
- private def addShutdownHook(): Thread = {
- val shutdownHook = new Thread("delete Spark local dirs") {
- override def run(): Unit = Utils.logUncaughtExceptions {
- logDebug("Shutdown hook called")
- DiskBlockManager.this.doStop()
- }
+ private def addShutdownHook(): AnyRef = {
+ Utils.addShutdownHook { () =>
+ logDebug("Shutdown hook called")
+ DiskBlockManager.this.doStop()
}
- Runtime.getRuntime.addShutdownHook(shutdownHook)
- shutdownHook
}
/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
// Remove the shutdown hook. It causes memory leaks if we leave it around.
- try {
- Runtime.getRuntime.removeShutdownHook(shutdownHook)
- } catch {
- case e: IllegalStateException => None
- }
+ Utils.removeShutdownHook(shutdownHook)
doStop()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index af87303421..951897cead 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager(
private def addShutdownHook() {
tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
- override def run(): Unit = Utils.logUncaughtExceptions {
- logDebug("Shutdown hook called")
- tachyonDirs.foreach { tachyonDir =>
- try {
- if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
- Utils.deleteRecursively(tachyonDir, client)
- }
- } catch {
- case e: Exception =>
- logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
+ Utils.addShutdownHook { () =>
+ logDebug("Shutdown hook called")
+ tachyonDirs.foreach { tachyonDir =>
+ try {
+ if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+ Utils.deleteRecursively(tachyonDir, client)
}
+ } catch {
+ case e: Exception =>
+ logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
- client.close()
}
- })
+ client.close()
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1029b0f9fc..7b0de1ae55 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
-import java.util.{Properties, Locale, Random, UUID}
+import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
@@ -30,7 +30,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.{ByteStreams, Files}
@@ -64,9 +64,15 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()
+ val DEFAULT_SHUTDOWN_PRIORITY = 100
+
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
+
+ private val shutdownHooks = new SparkShutdownHookManager()
+ shutdownHooks.install()
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -176,18 +182,16 @@ private[spark] object Utils extends Logging {
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
// Add a shutdown hook to delete the temp dirs when the JVM exits
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
- override def run(): Unit = Utils.logUncaughtExceptions {
- logDebug("Shutdown hook called")
- shutdownDeletePaths.foreach { dirPath =>
- try {
- Utils.deleteRecursively(new File(dirPath))
- } catch {
- case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
- }
+ addShutdownHook { () =>
+ logDebug("Shutdown hook called")
+ shutdownDeletePaths.foreach { dirPath =>
+ try {
+ Utils.deleteRecursively(new File(dirPath))
+ } catch {
+ case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
}
- })
+ }
// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
@@ -613,7 +617,7 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)
- val timeoutMs =
+ val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
@@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging {
/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
- *
+ *
* NOTE: This method is to be called by the spark-started JVM process.
*/
def tryOrExit(block: => Unit) {
@@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging {
}
/**
- * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
+ * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* exception
- *
- * NOTE: This method is to be called by the driver-side components to avoid stopping the
- * user-started JVM process completely; in contrast, tryOrExit is to be called in the
+ *
+ * NOTE: This method is to be called by the driver-side components to avoid stopping the
+ * user-started JVM process completely; in contrast, tryOrExit is to be called in the
* spark-started JVM process .
*/
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
@@ -2132,6 +2136,102 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}
+ /**
+ * Adds a shutdown hook with default priority.
+ *
+ * @param hook The code to run during shutdown.
+ * @return A handle that can be used to unregister the shutdown hook.
+ */
+ def addShutdownHook(hook: () => Unit): AnyRef = {
+ addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
+ }
+
+ /**
+ * Adds a shutdown hook with the given priority. Hooks with lower priority values run
+ * first.
+ *
+ * @param hook The code to run during shutdown.
+ * @return A handle that can be used to unregister the shutdown hook.
+ */
+ def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
+ shutdownHooks.add(priority, hook)
+ }
+
+ /**
+ * Remove a previously installed shutdown hook.
+ *
+ * @param ref A handle returned by `addShutdownHook`.
+ * @return Whether the hook was removed.
+ */
+ def removeShutdownHook(ref: AnyRef): Boolean = {
+ shutdownHooks.remove(ref)
+ }
+
+}
+
+private [util] class SparkShutdownHookManager {
+
+ private val hooks = new PriorityQueue[SparkShutdownHook]()
+ private var shuttingDown = false
+
+ /**
+ * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
+ * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
+ * the best.
+ */
+ def install(): Unit = {
+ val hookTask = new Runnable() {
+ override def run(): Unit = runAll()
+ }
+ Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
+ case Success(shmClass) =>
+ val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+ .asInstanceOf[Int]
+ val shm = shmClass.getMethod("get").invoke(null)
+ shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
+ .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
+
+ case Failure(_) =>
+ Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
+ }
+ }
+
+ def runAll(): Unit = synchronized {
+ shuttingDown = true
+ while (!hooks.isEmpty()) {
+ Utils.logUncaughtExceptions(hooks.poll().run())
+ }
+ }
+
+ def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
+ checkState()
+ val hookRef = new SparkShutdownHook(priority, hook)
+ hooks.add(hookRef)
+ hookRef
+ }
+
+ def remove(ref: AnyRef): Boolean = synchronized {
+ checkState()
+ hooks.remove(ref)
+ }
+
+ private def checkState(): Unit = {
+ if (shuttingDown) {
+ throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
+ }
+ }
+
+}
+
+private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
+ extends Comparable[SparkShutdownHook] {
+
+ override def compareTo(other: SparkShutdownHook): Int = {
+ other.priority - priority
+ }
+
+ def run(): Unit = hook()
+
}
/**
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index fb97e650ff..1ba99803f5 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -17,14 +17,16 @@
package org.apache.spark.util
-import scala.util.Random
-
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
import java.util.concurrent.TimeUnit
import java.util.Locale
+import java.util.PriorityQueue
+
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
@@ -36,14 +38,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
class UtilsSuite extends FunSuite with ResetSystemProperties {
-
+
test("timeConversion") {
// Test -1
assert(Utils.timeStringAsSeconds("-1") === -1)
-
+
// Test zero
assert(Utils.timeStringAsSeconds("0") === 0)
-
+
assert(Utils.timeStringAsSeconds("1") === 1)
assert(Utils.timeStringAsSeconds("1s") === 1)
assert(Utils.timeStringAsSeconds("1000ms") === 1)
@@ -52,7 +54,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
-
+
assert(Utils.timeStringAsMs("1") === 1)
assert(Utils.timeStringAsMs("1ms") === 1)
assert(Utils.timeStringAsMs("1000us") === 1)
@@ -61,7 +63,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
-
+
// Test invalid strings
intercept[NumberFormatException] {
Utils.timeStringAsMs("This breaks 600s")
@@ -79,7 +81,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
Utils.timeStringAsMs("This 123s breaks")
}
}
-
+
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")
@@ -466,4 +468,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val newFileName = new File(testFileDir, testFileName)
assert(newFileName.isFile())
}
+
+ test("shutdown hook manager") {
+ val manager = new SparkShutdownHookManager()
+ val output = new ListBuffer[Int]()
+
+ val hook1 = manager.add(1, () => output += 1)
+ manager.add(3, () => output += 3)
+ manager.add(2, () => output += 2)
+ manager.add(4, () => output += 4)
+ manager.remove(hook1)
+
+ manager.runAll()
+ assert(output.toList === List(4, 3, 2))
+ }
}