aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala136
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala32
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala9
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala9
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala63
9 files changed, 195 insertions, 114 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 72f6048239..56bef57e55 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.SignalLogger
+import org.apache.spark.util.{SignalLogger, Utils}
/**
* A web server that renders SparkUIs of completed applications.
@@ -194,9 +194,7 @@ object HistoryServer extends Logging {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
- Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
- override def run(): Unit = server.stop()
- })
+ Utils.addShutdownHook { () => server.stop() }
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7d5acabb95..7aa85b732f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -28,6 +28,7 @@ import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.Utils
import org.apache.spark.util.logging.FileAppender
/**
@@ -61,7 +62,7 @@ private[deploy] class ExecutorRunner(
// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
- private var shutdownHook: Thread = null
+ private var shutdownHook: AnyRef = null
private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
@@ -69,12 +70,7 @@ private[deploy] class ExecutorRunner(
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
- shutdownHook = new Thread() {
- override def run() {
- killProcess(Some("Worker shutting down"))
- }
- }
- Runtime.getRuntime.addShutdownHook(shutdownHook)
+ shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
}
/**
@@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner(
workerThread = null
state = ExecutorState.KILLED
try {
- Runtime.getRuntime.removeShutdownHook(shutdownHook)
+ Utils.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 2883137872..7ea5e54f9e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
}
- private def addShutdownHook(): Thread = {
- val shutdownHook = new Thread("delete Spark local dirs") {
- override def run(): Unit = Utils.logUncaughtExceptions {
- logDebug("Shutdown hook called")
- DiskBlockManager.this.doStop()
- }
+ private def addShutdownHook(): AnyRef = {
+ Utils.addShutdownHook { () =>
+ logDebug("Shutdown hook called")
+ DiskBlockManager.this.doStop()
}
- Runtime.getRuntime.addShutdownHook(shutdownHook)
- shutdownHook
}
/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
// Remove the shutdown hook. It causes memory leaks if we leave it around.
- try {
- Runtime.getRuntime.removeShutdownHook(shutdownHook)
- } catch {
- case e: IllegalStateException => None
- }
+ Utils.removeShutdownHook(shutdownHook)
doStop()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index af87303421..951897cead 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager(
private def addShutdownHook() {
tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
- override def run(): Unit = Utils.logUncaughtExceptions {
- logDebug("Shutdown hook called")
- tachyonDirs.foreach { tachyonDir =>
- try {
- if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
- Utils.deleteRecursively(tachyonDir, client)
- }
- } catch {
- case e: Exception =>
- logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
+ Utils.addShutdownHook { () =>
+ logDebug("Shutdown hook called")
+ tachyonDirs.foreach { tachyonDir =>
+ try {
+ if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+ Utils.deleteRecursively(tachyonDir, client)
}
+ } catch {
+ case e: Exception =>
+ logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
- client.close()
}
- })
+ client.close()
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1029b0f9fc..7b0de1ae55 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
-import java.util.{Properties, Locale, Random, UUID}
+import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
@@ -30,7 +30,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.{ByteStreams, Files}
@@ -64,9 +64,15 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()
+ val DEFAULT_SHUTDOWN_PRIORITY = 100
+
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
+
+ private val shutdownHooks = new SparkShutdownHookManager()
+ shutdownHooks.install()
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -176,18 +182,16 @@ private[spark] object Utils extends Logging {
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
// Add a shutdown hook to delete the temp dirs when the JVM exits
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
- override def run(): Unit = Utils.logUncaughtExceptions {
- logDebug("Shutdown hook called")
- shutdownDeletePaths.foreach { dirPath =>
- try {
- Utils.deleteRecursively(new File(dirPath))
- } catch {
- case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
- }
+ addShutdownHook { () =>
+ logDebug("Shutdown hook called")
+ shutdownDeletePaths.foreach { dirPath =>
+ try {
+ Utils.deleteRecursively(new File(dirPath))
+ } catch {
+ case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
}
- })
+ }
// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
@@ -613,7 +617,7 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)
- val timeoutMs =
+ val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
@@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging {
/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
- *
+ *
* NOTE: This method is to be called by the spark-started JVM process.
*/
def tryOrExit(block: => Unit) {
@@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging {
}
/**
- * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
+ * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* exception
- *
- * NOTE: This method is to be called by the driver-side components to avoid stopping the
- * user-started JVM process completely; in contrast, tryOrExit is to be called in the
+ *
+ * NOTE: This method is to be called by the driver-side components to avoid stopping the
+ * user-started JVM process completely; in contrast, tryOrExit is to be called in the
* spark-started JVM process .
*/
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
@@ -2132,6 +2136,102 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}
+ /**
+ * Adds a shutdown hook with default priority.
+ *
+ * @param hook The code to run during shutdown.
+ * @return A handle that can be used to unregister the shutdown hook.
+ */
+ def addShutdownHook(hook: () => Unit): AnyRef = {
+ addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
+ }
+
+ /**
+ * Adds a shutdown hook with the given priority. Hooks with lower priority values run
+ * first.
+ *
+ * @param hook The code to run during shutdown.
+ * @return A handle that can be used to unregister the shutdown hook.
+ */
+ def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
+ shutdownHooks.add(priority, hook)
+ }
+
+ /**
+ * Remove a previously installed shutdown hook.
+ *
+ * @param ref A handle returned by `addShutdownHook`.
+ * @return Whether the hook was removed.
+ */
+ def removeShutdownHook(ref: AnyRef): Boolean = {
+ shutdownHooks.remove(ref)
+ }
+
+}
+
+private [util] class SparkShutdownHookManager {
+
+ private val hooks = new PriorityQueue[SparkShutdownHook]()
+ private var shuttingDown = false
+
+ /**
+ * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
+ * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
+ * the best.
+ */
+ def install(): Unit = {
+ val hookTask = new Runnable() {
+ override def run(): Unit = runAll()
+ }
+ Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
+ case Success(shmClass) =>
+ val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+ .asInstanceOf[Int]
+ val shm = shmClass.getMethod("get").invoke(null)
+ shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
+ .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
+
+ case Failure(_) =>
+ Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
+ }
+ }
+
+ def runAll(): Unit = synchronized {
+ shuttingDown = true
+ while (!hooks.isEmpty()) {
+ Utils.logUncaughtExceptions(hooks.poll().run())
+ }
+ }
+
+ def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
+ checkState()
+ val hookRef = new SparkShutdownHook(priority, hook)
+ hooks.add(hookRef)
+ hookRef
+ }
+
+ def remove(ref: AnyRef): Boolean = synchronized {
+ checkState()
+ hooks.remove(ref)
+ }
+
+ private def checkState(): Unit = {
+ if (shuttingDown) {
+ throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
+ }
+ }
+
+}
+
+private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
+ extends Comparable[SparkShutdownHook] {
+
+ override def compareTo(other: SparkShutdownHook): Int = {
+ other.priority - priority
+ }
+
+ def run(): Unit = hook()
+
}
/**
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index fb97e650ff..1ba99803f5 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -17,14 +17,16 @@
package org.apache.spark.util
-import scala.util.Random
-
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
import java.util.concurrent.TimeUnit
import java.util.Locale
+import java.util.PriorityQueue
+
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
@@ -36,14 +38,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
class UtilsSuite extends FunSuite with ResetSystemProperties {
-
+
test("timeConversion") {
// Test -1
assert(Utils.timeStringAsSeconds("-1") === -1)
-
+
// Test zero
assert(Utils.timeStringAsSeconds("0") === 0)
-
+
assert(Utils.timeStringAsSeconds("1") === 1)
assert(Utils.timeStringAsSeconds("1s") === 1)
assert(Utils.timeStringAsSeconds("1000ms") === 1)
@@ -52,7 +54,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
-
+
assert(Utils.timeStringAsMs("1") === 1)
assert(Utils.timeStringAsMs("1ms") === 1)
assert(Utils.timeStringAsMs("1000us") === 1)
@@ -61,7 +63,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
-
+
// Test invalid strings
intercept[NumberFormatException] {
Utils.timeStringAsMs("This breaks 600s")
@@ -79,7 +81,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
Utils.timeStringAsMs("This 123s breaks")
}
}
-
+
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")
@@ -466,4 +468,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val newFileName = new File(testFileDir, testFileName)
assert(newFileName.isFile())
}
+
+ test("shutdown hook manager") {
+ val manager = new SparkShutdownHookManager()
+ val output = new ListBuffer[Int]()
+
+ val hook1 = manager.add(1, () => output += 1)
+ manager.add(3, () => output += 3)
+ manager.add(2, () => output += 2)
+ manager.add(4, () => output += 4)
+ manager.remove(hook1)
+
+ manager.runAll()
+ assert(output.toList === List(4, 3, 2))
+ }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index c3a3f8c0f4..832596fc8b 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener}
+import org.apache.spark.util.Utils
/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
@@ -57,13 +58,7 @@ object HiveThriftServer2 extends Logging {
logInfo("Starting SparkContext")
SparkSQLEnv.init()
- Runtime.getRuntime.addShutdownHook(
- new Thread() {
- override def run() {
- SparkSQLEnv.stop()
- }
- }
- )
+ Utils.addShutdownHook { () => SparkSQLEnv.stop() }
try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 85281c6d73..7e307bb4ad 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -40,6 +40,7 @@ import org.apache.thrift.transport.TSocket
import org.apache.spark.Logging
import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.util.Utils
private[hive] object SparkSQLCLIDriver {
private var prompt = "spark-sql"
@@ -101,13 +102,7 @@ private[hive] object SparkSQLCLIDriver {
SessionState.start(sessionState)
// Clean up after we exit
- Runtime.getRuntime.addShutdownHook(
- new Thread() {
- override def run() {
- SparkSQLEnv.stop()
- }
- }
- )
+ Utils.addShutdownHook { () => SparkSQLEnv.stop() }
// "-h" option has been passed, so connect to Hive thrift server.
if (sessionState.getHost != null) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f7a84207e9..93ae45133c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -25,7 +25,6 @@ import java.net.{Socket, URL}
import java.util.concurrent.atomic.AtomicReference
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -95,44 +94,38 @@ private[spark] class ApplicationMaster(
logInfo("ApplicationAttemptId: " + appAttemptId)
val fs = FileSystem.get(yarnConf)
- val cleanupHook = new Runnable {
- override def run() {
- // If the SparkContext is still registered, shut it down as a best case effort in case
- // users do not call sc.stop or do System.exit().
- val sc = sparkContextRef.get()
- if (sc != null) {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- }
- val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
- val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
-
- if (!finished) {
- // This happens when the user application calls System.exit(). We have the choice
- // of either failing or succeeding at this point. We report success to avoid
- // retrying applications that have succeeded (System.exit(0)), which means that
- // applications that explicitly exit with a non-zero status will also show up as
- // succeeded in the RM UI.
- finish(finalStatus,
- ApplicationMaster.EXIT_SUCCESS,
- "Shutdown hook called before final status was reported.")
- }
- if (!unregistered) {
- // we only want to unregister if we don't want the RM to retry
- if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
- unregister(finalStatus, finalMsg)
- cleanupStagingDir(fs)
- }
+ Utils.addShutdownHook { () =>
+ // If the SparkContext is still registered, shut it down as a best case effort in case
+ // users do not call sc.stop or do System.exit().
+ val sc = sparkContextRef.get()
+ if (sc != null) {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ }
+ val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
+ val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+
+ if (!finished) {
+ // This happens when the user application calls System.exit(). We have the choice
+ // of either failing or succeeding at this point. We report success to avoid
+ // retrying applications that have succeeded (System.exit(0)), which means that
+ // applications that explicitly exit with a non-zero status will also show up as
+ // succeeded in the RM UI.
+ finish(finalStatus,
+ ApplicationMaster.EXIT_SUCCESS,
+ "Shutdown hook called before final status was reported.")
+ }
+
+ if (!unregistered) {
+ // we only want to unregister if we don't want the RM to retry
+ if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
+ unregister(finalStatus, finalMsg)
+ cleanupStagingDir(fs)
}
}
}
- // Use higher priority than FileSystem.
- assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
- ShutdownHookManager
- .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
-
// Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserApplication which does a
// doAs in order for the credentials to be passed on to the executor containers.
@@ -546,8 +539,6 @@ private[spark] class ApplicationMaster(
object ApplicationMaster extends Logging {
- val SHUTDOWN_HOOK_PRIORITY: Int = 30
-
// exit codes for different causes, no reason behind the values
private val EXIT_SUCCESS = 0
private val EXIT_UNCAUGHT_EXCEPTION = 10