aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala63
1 files changed, 27 insertions, 36 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f7a84207e9..93ae45133c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -25,7 +25,6 @@ import java.net.{Socket, URL}
import java.util.concurrent.atomic.AtomicReference
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -95,44 +94,38 @@ private[spark] class ApplicationMaster(
logInfo("ApplicationAttemptId: " + appAttemptId)
val fs = FileSystem.get(yarnConf)
- val cleanupHook = new Runnable {
- override def run() {
- // If the SparkContext is still registered, shut it down as a best case effort in case
- // users do not call sc.stop or do System.exit().
- val sc = sparkContextRef.get()
- if (sc != null) {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- }
- val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
- val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
-
- if (!finished) {
- // This happens when the user application calls System.exit(). We have the choice
- // of either failing or succeeding at this point. We report success to avoid
- // retrying applications that have succeeded (System.exit(0)), which means that
- // applications that explicitly exit with a non-zero status will also show up as
- // succeeded in the RM UI.
- finish(finalStatus,
- ApplicationMaster.EXIT_SUCCESS,
- "Shutdown hook called before final status was reported.")
- }
- if (!unregistered) {
- // we only want to unregister if we don't want the RM to retry
- if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
- unregister(finalStatus, finalMsg)
- cleanupStagingDir(fs)
- }
+ Utils.addShutdownHook { () =>
+ // If the SparkContext is still registered, shut it down as a best case effort in case
+ // users do not call sc.stop or do System.exit().
+ val sc = sparkContextRef.get()
+ if (sc != null) {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ }
+ val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
+ val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+
+ if (!finished) {
+ // This happens when the user application calls System.exit(). We have the choice
+ // of either failing or succeeding at this point. We report success to avoid
+ // retrying applications that have succeeded (System.exit(0)), which means that
+ // applications that explicitly exit with a non-zero status will also show up as
+ // succeeded in the RM UI.
+ finish(finalStatus,
+ ApplicationMaster.EXIT_SUCCESS,
+ "Shutdown hook called before final status was reported.")
+ }
+
+ if (!unregistered) {
+ // we only want to unregister if we don't want the RM to retry
+ if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
+ unregister(finalStatus, finalMsg)
+ cleanupStagingDir(fs)
}
}
}
- // Use higher priority than FileSystem.
- assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
- ShutdownHookManager
- .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
-
// Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserApplication which does a
// doAs in order for the credentials to be passed on to the executor containers.
@@ -546,8 +539,6 @@ private[spark] class ApplicationMaster(
object ApplicationMaster extends Logging {
- val SHUTDOWN_HOOK_PRIORITY: Int = 30
-
// exit codes for different causes, no reason behind the values
private val EXIT_SUCCESS = 0
private val EXIT_UNCAUGHT_EXCEPTION = 10