From 323a83c5235f9289cd9526491d62365df96a429b Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Thu, 24 Jul 2014 14:46:10 -0500 Subject: [SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures Author: GuoQiang Li Closes #1180 from witgo/SPARK-2037 and squashes the following commits: 3d52411 [GuoQiang Li] review commit 7058f4d [GuoQiang Li] Correctly stop SparkContext 6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures --- .../spark/deploy/yarn/ExecutorLauncher.scala | 80 ++++++++++++++-------- .../cluster/YarnClientSchedulerBackend.scala | 28 ++++++++ .../spark/deploy/yarn/ExecutorLauncher.scala | 45 +++++++++--- 3 files changed, 115 insertions(+), 38 deletions(-) (limited to 'yarn') diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index d232c18d2f..184e2ad6c8 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ -import akka.actor.Terminated import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -57,10 +56,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = _ - private var driverClosed:Boolean = false + + private var driverClosed: Boolean = false + private var isFinished: Boolean = false + private var registered: Boolean = false + + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) val securityManager = new SecurityManager(sparkConf) - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, conf = sparkConf, securityManager = securityManager)._1 var actor: ActorRef = _ @@ -97,23 +103,26 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() - val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() - - // Compute number of threads for akka - val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() - - if (minimumMemory > 0) { - val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) - val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) - - if (numCore > 0) { - // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 - // TODO: Uncomment when hadoop is on a version which has this fixed. - // args.workerCores = numCore + synchronized { + if (!isFinished) { + val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() + // Compute number of threads for akka + val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() + + if (minimumMemory > 0) { + val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) + + if (numCore > 0) { + // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 + // TODO: Uncomment when hadoop is on a version which has this fixed. + // args.workerCores = numCore + } + } + registered = true } } - waitForSparkMaster() addAmIpFilter() // Allocate all containers @@ -243,11 +252,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { yarnAllocator.allocateContainers( math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) + checkNumExecutorsFailed() Thread.sleep(100) } logInfo("All executors have launched.") - + } + private def checkNumExecutorsFailed() { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of executor failures reached") + } } // TODO: We might want to extend this to allocate more containers in case they die ! @@ -257,6 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val t = new Thread { override def run() { while (!driverClosed) { + checkNumExecutorsFailed() val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning if (missingExecutorCount > 0) { logInfo("Allocating " + missingExecutorCount + @@ -282,15 +298,23 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp yarnAllocator.allocateContainers(0) } - def finishApplicationMaster(status: FinalApplicationStatus) { - - logInfo("finish ApplicationMaster with " + status) - val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) - .asInstanceOf[FinishApplicationMasterRequest] - finishReq.setAppAttemptId(appAttemptId) - finishReq.setFinishApplicationStatus(status) - finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", "")) - resourceManager.finishApplicationMaster(finishReq) + def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") { + synchronized { + if (isFinished) { + return + } + logInfo("Unregistering ApplicationMaster with " + status) + if (registered) { + val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) + .asInstanceOf[FinishApplicationMasterRequest] + finishReq.setAppAttemptId(appAttemptId) + finishReq.setFinishApplicationStatus(status) + finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", "")) + finishReq.setDiagnostics(appMessage) + resourceManager.finishApplicationMaster(finishReq) + } + isFinished = true + } } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 77b91f8e26..f8fb96b312 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -37,6 +37,8 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null + var checkerThread: Thread = null + var stopping: Boolean = false private[spark] def addArg(optionName: String, envVar: String, sysProp: String, arrayBuf: ArrayBuffer[String]) { @@ -86,6 +88,7 @@ private[spark] class YarnClientSchedulerBackend( client = new Client(args, conf) appId = client.runApp() waitForApp() + checkerThread = yarnApplicationStateCheckerThread() } def waitForApp() { @@ -116,7 +119,32 @@ private[spark] class YarnClientSchedulerBackend( } } + private def yarnApplicationStateCheckerThread(): Thread = { + val t = new Thread { + override def run() { + while (!stopping) { + val report = client.getApplicationReport(appId) + val state = report.getYarnApplicationState() + if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED + || state == YarnApplicationState.FAILED) { + logError(s"Yarn application already ended: $state") + sc.stop() + stopping = true + } + Thread.sleep(1000L) + } + checkerThread = null + Thread.currentThread().interrupt() + } + } + t.setName("Yarn Application State Checker") + t.setDaemon(true) + t.start() + t + } + override def stop() { + stopping = true super.stop() client.stop logInfo("Stopped") diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 7158d9442a..fc7b8320d7 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -19,15 +19,12 @@ package org.apache.spark.deploy.yarn import java.net.Socket import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ -import akka.actor.Terminated import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -57,10 +54,16 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = _ - private var driverClosed:Boolean = false + private var driverClosed: Boolean = false + private var isFinished: Boolean = false + private var registered: Boolean = false private var amClient: AMRMClient[ContainerRequest] = _ + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) + val securityManager = new SecurityManager(sparkConf) val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, conf = sparkConf, securityManager = securityManager)._1 @@ -101,7 +104,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp amClient.start() appAttemptId = ApplicationMaster.getApplicationAttemptId() - registerApplicationMaster() + synchronized { + if (!isFinished) { + registerApplicationMaster() + registered = true + } + } waitForSparkMaster() addAmIpFilter() @@ -210,6 +218,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp yarnAllocator.addResourceRequests(args.numExecutors) yarnAllocator.allocateResources() while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { + checkNumExecutorsFailed() allocateMissingExecutor() yarnAllocator.allocateResources() Thread.sleep(100) @@ -228,12 +237,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } } + private def checkNumExecutorsFailed() { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of executor failures reached") + } + } + private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { while (!driverClosed) { + checkNumExecutorsFailed() allocateMissingExecutor() logDebug("Sending progress") yarnAllocator.allocateResources() @@ -248,10 +265,18 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp t } - def finishApplicationMaster(status: FinalApplicationStatus) { - logInfo("Unregistering ApplicationMaster with " + status) - val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "") - amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl) + def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") { + synchronized { + if (isFinished) { + return + } + logInfo("Unregistering ApplicationMaster with " + status) + if (registered) { + val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") + amClient.unregisterApplicationMaster(status, appMessage, trackingUrl) + } + isFinished = true + } } } -- cgit v1.2.3