From 70e824f750aa8ed446eec104ba158b0503ba58a9 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 7 Oct 2014 09:51:37 -0500 Subject: [SPARK-3627] - [yarn] - fix exit code and final status reporting to RM See the description and whats handled in the jira comment: https://issues.apache.org/jira/browse/SPARK-3627?focusedCommentId=14150013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14150013 This does not handle yarn client mode reporting of the driver to the AM. I think that should be handled when we make it an unmanaged AM. Author: Thomas Graves Closes #2577 from tgravescs/SPARK-3627 and squashes the following commits: 9c2efbf [Thomas Graves] review comments e8cc261 [Thomas Graves] fix accidental typo during fixing comment 24c98e3 [Thomas Graves] rework 85f1901 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into SPARK-3627 fab166d [Thomas Graves] update based on review comments 32f4dfa [Thomas Graves] switch back f0b6519 [Thomas Graves] change order of cleanup staging dir d3cc800 [Thomas Graves] SPARK-3627 - yarn - fix exit code and final status reporting to RM --- .../spark/deploy/yarn/YarnRMClientImpl.scala | 26 +- .../spark/deploy/yarn/ApplicationMaster.scala | 295 +++++++++++++-------- .../apache/spark/deploy/yarn/YarnRMClient.scala | 4 +- .../spark/deploy/yarn/YarnRMClientImpl.scala | 13 +- 4 files changed, 212 insertions(+), 126 deletions(-) (limited to 'yarn') diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala index 9bd1719cb1..7faf55bc63 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC private var rpc: YarnRPC = null private var resourceManager: AMRMProtocol = _ private var uiHistoryAddress: String = _ + private var registered: Boolean = false override def register( conf: YarnConfiguration, @@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC this.rpc = YarnRPC.create(conf) this.uiHistoryAddress = uiHistoryAddress - resourceManager = registerWithResourceManager(conf) - registerApplicationMaster(uiAddress) + synchronized { + resourceManager = registerWithResourceManager(conf) + registerApplicationMaster(uiAddress) + registered = true + } new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args, preferredNodeLocations, securityMgr) @@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC appAttemptId } - override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = { - val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) - .asInstanceOf[FinishApplicationMasterRequest] - finishReq.setAppAttemptId(getAttemptId()) - finishReq.setFinishApplicationStatus(status) - finishReq.setDiagnostics(diagnostics) - finishReq.setTrackingUrl(uiHistoryAddress) - resourceManager.finishApplicationMaster(finishReq) + override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized { + if (registered) { + val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) + .asInstanceOf[FinishApplicationMasterRequest] + finishReq.setAppAttemptId(getAttemptId()) + finishReq.setFinishApplicationStatus(status) + finishReq.setDiagnostics(diagnostics) + finishReq.setTrackingUrl(uiHistoryAddress) + resourceManager.finishApplicationMaster(finishReq) + } } override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index caceef5d4b..a3c43b4384 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -56,8 +57,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) + @volatile private var exitCode = 0 + @volatile private var unregistered = false @volatile private var finished = false @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED + @volatile private var finalMsg: String = "" @volatile private var userClassThread: Thread = _ private var reporterThread: Thread = _ @@ -71,80 +75,107 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private val sparkContextRef = new AtomicReference[SparkContext](null) final def run(): Int = { - val appAttemptId = client.getAttemptId() + try { + val appAttemptId = client.getAttemptId() - if (isDriver) { - // Set the web ui port to be ephemeral for yarn so we don't conflict with - // other spark processes running on the same box - System.setProperty("spark.ui.port", "0") + if (isDriver) { + // Set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") - // Set the master property to match the requested mode. - System.setProperty("spark.master", "yarn-cluster") + // Set the master property to match the requested mode. + System.setProperty("spark.master", "yarn-cluster") - // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. - System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) - } + // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. + System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) + } - logInfo("ApplicationAttemptId: " + appAttemptId) + logInfo("ApplicationAttemptId: " + appAttemptId) - val cleanupHook = new Runnable { - override def run() { - // If the SparkContext is still registered, shut it down as a best case effort in case - // users do not call sc.stop or do System.exit(). - val sc = sparkContextRef.get() - if (sc != null) { - logInfo("Invoking sc stop from shutdown hook") - sc.stop() - finish(FinalApplicationStatus.SUCCEEDED) - } + val cleanupHook = new Runnable { + override def run() { + // If the SparkContext is still registered, shut it down as a best case effort in case + // users do not call sc.stop or do System.exit(). + val sc = sparkContextRef.get() + if (sc != null) { + logInfo("Invoking sc stop from shutdown hook") + sc.stop() + } + val maxAppAttempts = client.getMaxRegAttempts(yarnConf) + val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts + + if (!finished) { + // this shouldn't ever happen, but if it does assume weird failure + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, + "shutdown hook called without cleanly finishing") + } - // Cleanup the staging dir after the app is finished, or if it's the last attempt at - // running the AM. - val maxAppAttempts = client.getMaxRegAttempts(yarnConf) - val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts - if (finished || isLastAttempt) { - cleanupStagingDir() + if (!unregistered) { + // we only want to unregister if we don't want the RM to retry + if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { + unregister(finalStatus, finalMsg) + cleanupStagingDir() + } + } } } - } - // Use higher priority than FileSystem. - assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) - ShutdownHookManager - .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY) + // Use higher priority than FileSystem. + assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) + ShutdownHookManager + .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY) - // Call this to force generation of secret so it gets populated into the - // Hadoop UGI. This has to happen before the startUserClass which does a - // doAs in order for the credentials to be passed on to the executor containers. - val securityMgr = new SecurityManager(sparkConf) + // Call this to force generation of secret so it gets populated into the + // Hadoop UGI. This has to happen before the startUserClass which does a + // doAs in order for the credentials to be passed on to the executor containers. + val securityMgr = new SecurityManager(sparkConf) - if (isDriver) { - runDriver(securityMgr) - } else { - runExecutorLauncher(securityMgr) + if (isDriver) { + runDriver(securityMgr) + } else { + runExecutorLauncher(securityMgr) + } + } catch { + case e: Exception => + // catch everything else if not specifically handled + logError("Uncaught exception: ", e) + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, + "Uncaught exception: " + e.getMessage()) } + exitCode + } - if (finalStatus != FinalApplicationStatus.UNDEFINED) { - finish(finalStatus) - 0 - } else { - 1 + /** + * unregister is used to completely unregister the application from the ResourceManager. + * This means the ResourceManager will not retry the application attempt on your behalf if + * a failure occurred. + */ + final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized { + if (!unregistered) { + logInfo(s"Unregistering ApplicationMaster with $status" + + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) + unregistered = true + client.unregister(status, Option(diagnostics).getOrElse("")) } } - final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized { + final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized { if (!finished) { - logInfo(s"Finishing ApplicationMaster with $status" + - Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) - finished = true + logInfo(s"Final app status: ${status}, exitCode: ${code}" + + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) + exitCode = code finalStatus = status - try { - if (Thread.currentThread() != reporterThread) { - reporterThread.interrupt() - reporterThread.join() - } - } finally { - client.shutdown(status, Option(diagnostics).getOrElse("")) + finalMsg = msg + finished = true + if (Thread.currentThread() != reporterThread && reporterThread != null) { + logDebug("shutting down reporter thread") + reporterThread.interrupt() + } + if (Thread.currentThread() != userClassThread && userClassThread != null) { + logDebug("shutting down user thread") + userClassThread.interrupt() } } } @@ -182,7 +213,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private def runDriver(securityMgr: SecurityManager): Unit = { addAmIpFilter() - val userThread = startUserClass() + setupSystemSecurityManager() + userClassThread = startUserClass() // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. @@ -190,15 +222,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // If there is no SparkContext at this point, just fail the app. if (sc == null) { - finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_SC_NOT_INITED, + "Timed out waiting for SparkContext.") } else { registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) - try { - userThread.join() - } finally { - // In cluster mode, ask the reporter thread to stop since the user app is finished. - reporterThread.interrupt() - } + userClassThread.join() } } @@ -211,7 +240,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // In client mode the actor will stop the reporter thread. reporterThread.join() - finalStatus = FinalApplicationStatus.SUCCEEDED } private def launchReporterThread(): Thread = { @@ -231,33 +259,26 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, val t = new Thread { override def run() { var failureCount = 0 - while (!finished) { try { - checkNumExecutorsFailed() - if (!finished) { + if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, + "Max number of executor failures reached") + } else { logDebug("Sending progress") allocator.allocateResources() } failureCount = 0 } catch { + case i: InterruptedException => case e: Throwable => { failureCount += 1 if (!NonFatal(e) || failureCount >= reporterMaxFailures) { - logError("Exception was thrown from Reporter thread.", e) - finish(FinalApplicationStatus.FAILED, "Exception was thrown" + - s"${failureCount} time(s) from Reporter thread.") - - /** - * If exception is thrown from ReporterThread, - * interrupt user class to stop. - * Without this interrupting, if exception is - * thrown before allocating enough executors, - * YarnClusterScheduler waits until timeout even though - * we cannot allocate executors. - */ - logInfo("Interrupting user class to stop.") - userClassThread.interrupt + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + + s"${failureCount} time(s) from Reporter thread.") + } else { logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e) } @@ -308,7 +329,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkContextRef.synchronized { var count = 0 val waitTime = 10000L - val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) + val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) while (sparkContextRef.get() == null && count < numTries && !finished) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 @@ -328,10 +349,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private def waitForSparkDriver(): ActorRef = { logInfo("Waiting for Spark driver to be reachable.") var driverUp = false + var count = 0 val hostport = args.userArgs(0) val (driverHost, driverPort) = Utils.parseHostPort(hostport) - while (!driverUp) { + + // spark driver should already be up since it launched us, but we don't want to + // wait forever, so wait 100 seconds max to match the cluster mode setting. + // Leave this config unpublished for now. SPARK-3779 to investigating changing + // this config to be time based. + val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000) + + while (!driverUp && !finished && count < numTries) { try { + count = count + 1 val socket = new Socket(driverHost, driverPort) socket.close() logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) @@ -343,6 +373,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, Thread.sleep(100) } } + + if (!driverUp) { + throw new SparkException("Failed to connect to driver!") + } + sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) @@ -354,18 +389,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") } - private def checkNumExecutorsFailed() = { - if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.") - - val sc = sparkContextRef.get() - if (sc != null) { - logInfo("Invoking sc stop from checkNumExecutorsFailed") - sc.stop() - } - } - } - /** Add the Yarn IP filter that is required for properly securing the UI. */ private def addAmIpFilter() = { val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) @@ -379,40 +402,81 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, } } + /** + * This system security manager applies to the entire process. + * It's main purpose is to handle the case if the user code does a System.exit. + * This allows us to catch that and properly set the YARN application status and + * cleanup if needed. + */ + private def setupSystemSecurityManager(): Unit = { + try { + var stopped = false + System.setSecurityManager(new java.lang.SecurityManager() { + override def checkExit(paramInt: Int) { + if (!stopped) { + logInfo("In securityManager checkExit, exit code: " + paramInt) + if (paramInt == 0) { + finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) + } else { + finish(FinalApplicationStatus.FAILED, + paramInt, + "User class exited with non-zero exit code") + } + stopped = true + } + } + // required for the checkExit to work properly + override def checkPermission(perm: java.security.Permission): Unit = {} + }) + } + catch { + case e: SecurityException => + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_SECURITY, + "Error in setSecurityManager") + logError("Error in setSecurityManager:", e) + } + } + + /** + * Start the user class, which contains the spark driver, in a separate Thread. + * If the main routine exits cleanly or exits with System.exit(0) we + * assume it was successful, for all other cases we assume failure. + * + * Returns the user thread that was started. + */ private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") System.setProperty("spark.executor.instances", args.numExecutors.toString) val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) - userClassThread = new Thread { + val userThread = new Thread { override def run() { - var status = FinalApplicationStatus.FAILED try { - // Copy val mainArgs = new Array[String](args.userArgs.size) args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) mainMethod.invoke(null, mainArgs) - // Some apps have "System.exit(0)" at the end. The user thread will stop here unless - // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. - status = FinalApplicationStatus.SUCCEEDED + finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) + logDebug("Done running users class") } catch { case e: InvocationTargetException => e.getCause match { case _: InterruptedException => // Reporter thread can interrupt to stop user class - - case e => throw e + case e: Exception => + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, + "User class threw exception: " + e.getMessage) + // re-throw to get it logged + throw e } - } finally { - logDebug("Finishing main") - finalStatus = status } } } - userClassThread.setName("Driver") - userClassThread.start() - userClassThread + userThread.setName("Driver") + userThread.start() + userThread } // Actor used to monitor the driver when running in client deploy mode. @@ -432,7 +496,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, override def receive = { case x: DisassociatedEvent => logInfo(s"Driver terminated or disconnected! Shutting down. $x") - finish(FinalApplicationStatus.SUCCEEDED) + finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) case x: AddWebUIFilter => logInfo(s"Add WebUI Filter. $x") driver ! x @@ -446,6 +510,15 @@ object ApplicationMaster extends Logging { val SHUTDOWN_HOOK_PRIORITY: Int = 30 + // exit codes for different causes, no reason behind the values + private val EXIT_SUCCESS = 0 + private val EXIT_UNCAUGHT_EXCEPTION = 10 + private val EXIT_MAX_EXECUTOR_FAILURES = 11 + private val EXIT_REPORTER_FAILURE = 12 + private val EXIT_SC_NOT_INITED = 13 + private val EXIT_SECURITY = 14 + private val EXIT_EXCEPTION_USER_CLASS = 15 + private var master: ApplicationMaster = _ def main(args: Array[String]) = { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 943dc56202..2510b9c9ce 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -49,12 +49,12 @@ trait YarnRMClient { securityMgr: SecurityManager): YarnAllocator /** - * Shuts down the AM. Guaranteed to only be called once. + * Unregister the AM. Guaranteed to only be called once. * * @param status The final status of the AM. * @param diagnostics Diagnostics message to include in the final status. */ - def shutdown(status: FinalApplicationStatus, diagnostics: String = ""): Unit + def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit /** Returns the attempt ID. */ def getAttemptId(): ApplicationAttemptId diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala index b581790e15..8d4b96ed79 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -45,6 +45,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC private var amClient: AMRMClient[ContainerRequest] = _ private var uiHistoryAddress: String = _ + private var registered: Boolean = false override def register( conf: YarnConfiguration, @@ -59,13 +60,19 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC this.uiHistoryAddress = uiHistoryAddress logInfo("Registering the ApplicationMaster") - amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) + synchronized { + amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) + registered = true + } new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args, preferredNodeLocations, securityMgr) } - override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = - amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) + override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized { + if (registered) { + amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) + } + } override def getAttemptId() = { val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) -- cgit v1.2.3