aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-21 20:33:57 -0400
committerSean Owen <sowen@cloudera.com>2015-04-21 20:33:57 -0400
commite72c16e30d85cdc394d318b5551698885cfda9b8 (patch)
tree9b450a7f27b311e5bd5b776e8aee2af96e3408d3 /yarn
parentb063a61b9852cf9b9d2c905332d2ecb2fd716cc4 (diff)
downloadspark-e72c16e30d85cdc394d318b5551698885cfda9b8.tar.gz
spark-e72c16e30d85cdc394d318b5551698885cfda9b8.tar.bz2
spark-e72c16e30d85cdc394d318b5551698885cfda9b8.zip
[SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
This change adds some new utility code to handle shutdown hooks in Spark. The main goal is to take advantage of Hadoop 2.x's API for shutdown hooks, which allows Spark to register a hook that will run before the one that cleans up HDFS clients, and thus avoids some races that would cause exceptions to show up and other issues such as failure to properly close event logs. Unfortunately, Hadoop 1.x does not have such APIs, so in that case correctness is still left to chance. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5560 from vanzin/SPARK-6014 and squashes the following commits: edfafb1 [Marcelo Vanzin] Better scaladoc. fcaeedd [Marcelo Vanzin] Merge branch 'master' into SPARK-6014 e7039dc [Marcelo Vanzin] [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala63
1 files changed, 27 insertions, 36 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f7a84207e9..93ae45133c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -25,7 +25,6 @@ import java.net.{Socket, URL}
import java.util.concurrent.atomic.AtomicReference
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -95,44 +94,38 @@ private[spark] class ApplicationMaster(
logInfo("ApplicationAttemptId: " + appAttemptId)
val fs = FileSystem.get(yarnConf)
- val cleanupHook = new Runnable {
- override def run() {
- // If the SparkContext is still registered, shut it down as a best case effort in case
- // users do not call sc.stop or do System.exit().
- val sc = sparkContextRef.get()
- if (sc != null) {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- }
- val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
- val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
-
- if (!finished) {
- // This happens when the user application calls System.exit(). We have the choice
- // of either failing or succeeding at this point. We report success to avoid
- // retrying applications that have succeeded (System.exit(0)), which means that
- // applications that explicitly exit with a non-zero status will also show up as
- // succeeded in the RM UI.
- finish(finalStatus,
- ApplicationMaster.EXIT_SUCCESS,
- "Shutdown hook called before final status was reported.")
- }
- if (!unregistered) {
- // we only want to unregister if we don't want the RM to retry
- if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
- unregister(finalStatus, finalMsg)
- cleanupStagingDir(fs)
- }
+ Utils.addShutdownHook { () =>
+ // If the SparkContext is still registered, shut it down as a best case effort in case
+ // users do not call sc.stop or do System.exit().
+ val sc = sparkContextRef.get()
+ if (sc != null) {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ }
+ val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
+ val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+
+ if (!finished) {
+ // This happens when the user application calls System.exit(). We have the choice
+ // of either failing or succeeding at this point. We report success to avoid
+ // retrying applications that have succeeded (System.exit(0)), which means that
+ // applications that explicitly exit with a non-zero status will also show up as
+ // succeeded in the RM UI.
+ finish(finalStatus,
+ ApplicationMaster.EXIT_SUCCESS,
+ "Shutdown hook called before final status was reported.")
+ }
+
+ if (!unregistered) {
+ // we only want to unregister if we don't want the RM to retry
+ if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
+ unregister(finalStatus, finalMsg)
+ cleanupStagingDir(fs)
}
}
}
- // Use higher priority than FileSystem.
- assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
- ShutdownHookManager
- .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
-
// Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserApplication which does a
// doAs in order for the credentials to be passed on to the executor containers.
@@ -546,8 +539,6 @@ private[spark] class ApplicationMaster(
object ApplicationMaster extends Logging {
- val SHUTDOWN_HOOK_PRIORITY: Int = 30
-
// exit codes for different causes, no reason behind the values
private val EXIT_SUCCESS = 0
private val EXIT_UNCAUGHT_EXCEPTION = 10