aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-10-07 09:51:37 -0500
committerThomas Graves <tgraves@apache.org>2014-10-07 09:51:37 -0500
commit70e824f750aa8ed446eec104ba158b0503ba58a9 (patch)
treee58b05f03b1a42eb845c70025bba4fb17495daef /yarn
parent69c3f441a9b6e942d6c08afecd59a0349d61cc7b (diff)
downloadspark-70e824f750aa8ed446eec104ba158b0503ba58a9.tar.gz
spark-70e824f750aa8ed446eec104ba158b0503ba58a9.tar.bz2
spark-70e824f750aa8ed446eec104ba158b0503ba58a9.zip
[SPARK-3627] - [yarn] - fix exit code and final status reporting to RM
See the description and whats handled in the jira comment: https://issues.apache.org/jira/browse/SPARK-3627?focusedCommentId=14150013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14150013 This does not handle yarn client mode reporting of the driver to the AM. I think that should be handled when we make it an unmanaged AM. Author: Thomas Graves <tgraves@apache.org> Closes #2577 from tgravescs/SPARK-3627 and squashes the following commits: 9c2efbf [Thomas Graves] review comments e8cc261 [Thomas Graves] fix accidental typo during fixing comment 24c98e3 [Thomas Graves] rework 85f1901 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into SPARK-3627 fab166d [Thomas Graves] update based on review comments 32f4dfa [Thomas Graves] switch back f0b6519 [Thomas Graves] change order of cleanup staging dir d3cc800 [Thomas Graves] SPARK-3627 - yarn - fix exit code and final status reporting to RM
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala26
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala295
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala4
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala13
4 files changed, 212 insertions, 126 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index 9bd1719cb1..7faf55bc63 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
private var rpc: YarnRPC = null
private var resourceManager: AMRMProtocol = _
private var uiHistoryAddress: String = _
+ private var registered: Boolean = false
override def register(
conf: YarnConfiguration,
@@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
this.rpc = YarnRPC.create(conf)
this.uiHistoryAddress = uiHistoryAddress
- resourceManager = registerWithResourceManager(conf)
- registerApplicationMaster(uiAddress)
+ synchronized {
+ resourceManager = registerWithResourceManager(conf)
+ registerApplicationMaster(uiAddress)
+ registered = true
+ }
new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
preferredNodeLocations, securityMgr)
@@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
appAttemptId
}
- override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(getAttemptId())
- finishReq.setFinishApplicationStatus(status)
- finishReq.setDiagnostics(diagnostics)
- finishReq.setTrackingUrl(uiHistoryAddress)
- resourceManager.finishApplicationMaster(finishReq)
+ override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
+ if (registered) {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(getAttemptId())
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
+ finishReq.setTrackingUrl(uiHistoryAddress)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
}
override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index caceef5d4b..a3c43b4384 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -56,8 +57,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
+ @volatile private var exitCode = 0
+ @volatile private var unregistered = false
@volatile private var finished = false
@volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+ @volatile private var finalMsg: String = ""
@volatile private var userClassThread: Thread = _
private var reporterThread: Thread = _
@@ -71,80 +75,107 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private val sparkContextRef = new AtomicReference[SparkContext](null)
final def run(): Int = {
- val appAttemptId = client.getAttemptId()
+ try {
+ val appAttemptId = client.getAttemptId()
- if (isDriver) {
- // Set the web ui port to be ephemeral for yarn so we don't conflict with
- // other spark processes running on the same box
- System.setProperty("spark.ui.port", "0")
+ if (isDriver) {
+ // Set the web ui port to be ephemeral for yarn so we don't conflict with
+ // other spark processes running on the same box
+ System.setProperty("spark.ui.port", "0")
- // Set the master property to match the requested mode.
- System.setProperty("spark.master", "yarn-cluster")
+ // Set the master property to match the requested mode.
+ System.setProperty("spark.master", "yarn-cluster")
- // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
- System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
- }
+ // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
+ System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
+ }
- logInfo("ApplicationAttemptId: " + appAttemptId)
+ logInfo("ApplicationAttemptId: " + appAttemptId)
- val cleanupHook = new Runnable {
- override def run() {
- // If the SparkContext is still registered, shut it down as a best case effort in case
- // users do not call sc.stop or do System.exit().
- val sc = sparkContextRef.get()
- if (sc != null) {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- finish(FinalApplicationStatus.SUCCEEDED)
- }
+ val cleanupHook = new Runnable {
+ override def run() {
+ // If the SparkContext is still registered, shut it down as a best case effort in case
+ // users do not call sc.stop or do System.exit().
+ val sc = sparkContextRef.get()
+ if (sc != null) {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ }
+ val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+ val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+
+ if (!finished) {
+ // this shouldn't ever happen, but if it does assume weird failure
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+ "shutdown hook called without cleanly finishing")
+ }
- // Cleanup the staging dir after the app is finished, or if it's the last attempt at
- // running the AM.
- val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
- val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
- if (finished || isLastAttempt) {
- cleanupStagingDir()
+ if (!unregistered) {
+ // we only want to unregister if we don't want the RM to retry
+ if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
+ unregister(finalStatus, finalMsg)
+ cleanupStagingDir()
+ }
+ }
}
}
- }
- // Use higher priority than FileSystem.
- assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
- ShutdownHookManager
- .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
+ // Use higher priority than FileSystem.
+ assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
+ ShutdownHookManager
+ .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
- // Call this to force generation of secret so it gets populated into the
- // Hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the executor containers.
- val securityMgr = new SecurityManager(sparkConf)
+ // Call this to force generation of secret so it gets populated into the
+ // Hadoop UGI. This has to happen before the startUserClass which does a
+ // doAs in order for the credentials to be passed on to the executor containers.
+ val securityMgr = new SecurityManager(sparkConf)
- if (isDriver) {
- runDriver(securityMgr)
- } else {
- runExecutorLauncher(securityMgr)
+ if (isDriver) {
+ runDriver(securityMgr)
+ } else {
+ runExecutorLauncher(securityMgr)
+ }
+ } catch {
+ case e: Exception =>
+ // catch everything else if not specifically handled
+ logError("Uncaught exception: ", e)
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+ "Uncaught exception: " + e.getMessage())
}
+ exitCode
+ }
- if (finalStatus != FinalApplicationStatus.UNDEFINED) {
- finish(finalStatus)
- 0
- } else {
- 1
+ /**
+ * unregister is used to completely unregister the application from the ResourceManager.
+ * This means the ResourceManager will not retry the application attempt on your behalf if
+ * a failure occurred.
+ */
+ final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
+ if (!unregistered) {
+ logInfo(s"Unregistering ApplicationMaster with $status" +
+ Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+ unregistered = true
+ client.unregister(status, Option(diagnostics).getOrElse(""))
}
}
- final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
+ final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
if (!finished) {
- logInfo(s"Finishing ApplicationMaster with $status" +
- Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
- finished = true
+ logInfo(s"Final app status: ${status}, exitCode: ${code}" +
+ Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
+ exitCode = code
finalStatus = status
- try {
- if (Thread.currentThread() != reporterThread) {
- reporterThread.interrupt()
- reporterThread.join()
- }
- } finally {
- client.shutdown(status, Option(diagnostics).getOrElse(""))
+ finalMsg = msg
+ finished = true
+ if (Thread.currentThread() != reporterThread && reporterThread != null) {
+ logDebug("shutting down reporter thread")
+ reporterThread.interrupt()
+ }
+ if (Thread.currentThread() != userClassThread && userClassThread != null) {
+ logDebug("shutting down user thread")
+ userClassThread.interrupt()
}
}
}
@@ -182,7 +213,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
- val userThread = startUserClass()
+ setupSystemSecurityManager()
+ userClassThread = startUserClass()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
@@ -190,15 +222,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// If there is no SparkContext at this point, just fail the app.
if (sc == null) {
- finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_SC_NOT_INITED,
+ "Timed out waiting for SparkContext.")
} else {
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
- try {
- userThread.join()
- } finally {
- // In cluster mode, ask the reporter thread to stop since the user app is finished.
- reporterThread.interrupt()
- }
+ userClassThread.join()
}
}
@@ -211,7 +240,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// In client mode the actor will stop the reporter thread.
reporterThread.join()
- finalStatus = FinalApplicationStatus.SUCCEEDED
}
private def launchReporterThread(): Thread = {
@@ -231,33 +259,26 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
val t = new Thread {
override def run() {
var failureCount = 0
-
while (!finished) {
try {
- checkNumExecutorsFailed()
- if (!finished) {
+ if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+ "Max number of executor failures reached")
+ } else {
logDebug("Sending progress")
allocator.allocateResources()
}
failureCount = 0
} catch {
+ case i: InterruptedException =>
case e: Throwable => {
failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
- logError("Exception was thrown from Reporter thread.", e)
- finish(FinalApplicationStatus.FAILED, "Exception was thrown" +
- s"${failureCount} time(s) from Reporter thread.")
-
- /**
- * If exception is thrown from ReporterThread,
- * interrupt user class to stop.
- * Without this interrupting, if exception is
- * thrown before allocating enough executors,
- * YarnClusterScheduler waits until timeout even though
- * we cannot allocate executors.
- */
- logInfo("Interrupting user class to stop.")
- userClassThread.interrupt
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
+ s"${failureCount} time(s) from Reporter thread.")
+
} else {
logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
}
@@ -308,7 +329,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.synchronized {
var count = 0
val waitTime = 10000L
- val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
+ val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
while (sparkContextRef.get() == null && count < numTries && !finished) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
@@ -328,10 +349,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private def waitForSparkDriver(): ActorRef = {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
+ var count = 0
val hostport = args.userArgs(0)
val (driverHost, driverPort) = Utils.parseHostPort(hostport)
- while (!driverUp) {
+
+ // spark driver should already be up since it launched us, but we don't want to
+ // wait forever, so wait 100 seconds max to match the cluster mode setting.
+ // Leave this config unpublished for now. SPARK-3779 to investigating changing
+ // this config to be time based.
+ val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000)
+
+ while (!driverUp && !finished && count < numTries) {
try {
+ count = count + 1
val socket = new Socket(driverHost, driverPort)
socket.close()
logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
@@ -343,6 +373,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
Thread.sleep(100)
}
}
+
+ if (!driverUp) {
+ throw new SparkException("Failed to connect to driver!")
+ }
+
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
@@ -354,18 +389,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
}
- private def checkNumExecutorsFailed() = {
- if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")
-
- val sc = sparkContextRef.get()
- if (sc != null) {
- logInfo("Invoking sc stop from checkNumExecutorsFailed")
- sc.stop()
- }
- }
- }
-
/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter() = {
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
@@ -379,40 +402,81 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
}
}
+ /**
+ * This system security manager applies to the entire process.
+ * It's main purpose is to handle the case if the user code does a System.exit.
+ * This allows us to catch that and properly set the YARN application status and
+ * cleanup if needed.
+ */
+ private def setupSystemSecurityManager(): Unit = {
+ try {
+ var stopped = false
+ System.setSecurityManager(new java.lang.SecurityManager() {
+ override def checkExit(paramInt: Int) {
+ if (!stopped) {
+ logInfo("In securityManager checkExit, exit code: " + paramInt)
+ if (paramInt == 0) {
+ finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+ } else {
+ finish(FinalApplicationStatus.FAILED,
+ paramInt,
+ "User class exited with non-zero exit code")
+ }
+ stopped = true
+ }
+ }
+ // required for the checkExit to work properly
+ override def checkPermission(perm: java.security.Permission): Unit = {}
+ })
+ }
+ catch {
+ case e: SecurityException =>
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_SECURITY,
+ "Error in setSecurityManager")
+ logError("Error in setSecurityManager:", e)
+ }
+ }
+
+ /**
+ * Start the user class, which contains the spark driver, in a separate Thread.
+ * If the main routine exits cleanly or exits with System.exit(0) we
+ * assume it was successful, for all other cases we assume failure.
+ *
+ * Returns the user thread that was started.
+ */
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
- userClassThread = new Thread {
+ val userThread = new Thread {
override def run() {
- var status = FinalApplicationStatus.FAILED
try {
- // Copy
val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
- // Some apps have "System.exit(0)" at the end. The user thread will stop here unless
- // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
- status = FinalApplicationStatus.SUCCEEDED
+ finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+ logDebug("Done running users class")
} catch {
case e: InvocationTargetException =>
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class
-
- case e => throw e
+ case e: Exception =>
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
+ "User class threw exception: " + e.getMessage)
+ // re-throw to get it logged
+ throw e
}
- } finally {
- logDebug("Finishing main")
- finalStatus = status
}
}
}
- userClassThread.setName("Driver")
- userClassThread.start()
- userClassThread
+ userThread.setName("Driver")
+ userThread.start()
+ userThread
}
// Actor used to monitor the driver when running in client deploy mode.
@@ -432,7 +496,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
override def receive = {
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
- finish(FinalApplicationStatus.SUCCEEDED)
+ finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver ! x
@@ -446,6 +510,15 @@ object ApplicationMaster extends Logging {
val SHUTDOWN_HOOK_PRIORITY: Int = 30
+ // exit codes for different causes, no reason behind the values
+ private val EXIT_SUCCESS = 0
+ private val EXIT_UNCAUGHT_EXCEPTION = 10
+ private val EXIT_MAX_EXECUTOR_FAILURES = 11
+ private val EXIT_REPORTER_FAILURE = 12
+ private val EXIT_SC_NOT_INITED = 13
+ private val EXIT_SECURITY = 14
+ private val EXIT_EXCEPTION_USER_CLASS = 15
+
private var master: ApplicationMaster = _
def main(args: Array[String]) = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 943dc56202..2510b9c9ce 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -49,12 +49,12 @@ trait YarnRMClient {
securityMgr: SecurityManager): YarnAllocator
/**
- * Shuts down the AM. Guaranteed to only be called once.
+ * Unregister the AM. Guaranteed to only be called once.
*
* @param status The final status of the AM.
* @param diagnostics Diagnostics message to include in the final status.
*/
- def shutdown(status: FinalApplicationStatus, diagnostics: String = ""): Unit
+ def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
/** Returns the attempt ID. */
def getAttemptId(): ApplicationAttemptId
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index b581790e15..8d4b96ed79 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -45,6 +45,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
private var amClient: AMRMClient[ContainerRequest] = _
private var uiHistoryAddress: String = _
+ private var registered: Boolean = false
override def register(
conf: YarnConfiguration,
@@ -59,13 +60,19 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
this.uiHistoryAddress = uiHistoryAddress
logInfo("Registering the ApplicationMaster")
- amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+ synchronized {
+ amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+ registered = true
+ }
new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
preferredNodeLocations, securityMgr)
}
- override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") =
- amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+ override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
+ if (registered) {
+ amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+ }
+ }
override def getAttemptId() = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())