diff options
author | GuoQiang Li <witgo@qq.com> | 2014-07-24 14:46:10 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-07-24 14:46:10 -0500 |
commit | 323a83c5235f9289cd9526491d62365df96a429b (patch) | |
tree | 159b8b0f8cd00bfca3ddca0dfd672198a33534d4 /yarn/alpha/src | |
parent | c960b5051853f336fb01ea3f16567b9958baa1b6 (diff) | |
download | spark-323a83c5235f9289cd9526491d62365df96a429b.tar.gz spark-323a83c5235f9289cd9526491d62365df96a429b.tar.bz2 spark-323a83c5235f9289cd9526491d62365df96a429b.zip |
[SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures
Author: GuoQiang Li <witgo@qq.com>
Closes #1180 from witgo/SPARK-2037 and squashes the following commits:
3d52411 [GuoQiang Li] review commit
7058f4d [GuoQiang Li] Correctly stop SparkContext
6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures
Diffstat (limited to 'yarn/alpha/src')
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 80 |
1 files changed, 52 insertions, 28 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index d232c18d2f..184e2ad6c8 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ -import akka.actor.Terminated import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -57,10 +56,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = _ - private var driverClosed:Boolean = false + + private var driverClosed: Boolean = false + private var isFinished: Boolean = false + private var registered: Boolean = false + + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) val securityManager = new SecurityManager(sparkConf) - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, conf = sparkConf, securityManager = securityManager)._1 var actor: ActorRef = _ @@ -97,23 +103,26 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() - val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() - - // Compute number of threads for akka - val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() - - if (minimumMemory > 0) { - val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) - val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) - - if (numCore > 0) { - // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 - // TODO: Uncomment when hadoop is on a version which has this fixed. - // args.workerCores = numCore + synchronized { + if (!isFinished) { + val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() + // Compute number of threads for akka + val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() + + if (minimumMemory > 0) { + val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) + + if (numCore > 0) { + // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 + // TODO: Uncomment when hadoop is on a version which has this fixed. + // args.workerCores = numCore + } + } + registered = true } } - waitForSparkMaster() addAmIpFilter() // Allocate all containers @@ -243,11 +252,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { yarnAllocator.allocateContainers( math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) + checkNumExecutorsFailed() Thread.sleep(100) } logInfo("All executors have launched.") - + } + private def checkNumExecutorsFailed() { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of executor failures reached") + } } // TODO: We might want to extend this to allocate more containers in case they die ! @@ -257,6 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val t = new Thread { override def run() { while (!driverClosed) { + checkNumExecutorsFailed() val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning if (missingExecutorCount > 0) { logInfo("Allocating " + missingExecutorCount + @@ -282,15 +298,23 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp yarnAllocator.allocateContainers(0) } - def finishApplicationMaster(status: FinalApplicationStatus) { - - logInfo("finish ApplicationMaster with " + status) - val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) - .asInstanceOf[FinishApplicationMasterRequest] - finishReq.setAppAttemptId(appAttemptId) - finishReq.setFinishApplicationStatus(status) - finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", "")) - resourceManager.finishApplicationMaster(finishReq) + def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") { + synchronized { + if (isFinished) { + return + } + logInfo("Unregistering ApplicationMaster with " + status) + if (registered) { + val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) + .asInstanceOf[FinishApplicationMasterRequest] + finishReq.setAppAttemptId(appAttemptId) + finishReq.setFinishApplicationStatus(status) + finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", "")) + finishReq.setDiagnostics(appMessage) + resourceManager.finishApplicationMaster(finishReq) + } + isFinished = true + } } } |