From 35894af8cc66056352a1e4c7ebff1b6ecb12b7b9 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 10 Jun 2014 10:34:57 -0500 Subject: [SPARK-1978] In some cases, spark-yarn does not automatically restart the failed container Author: witgo Closes #921 from witgo/allocateExecutors and squashes the following commits: bc3aa66 [witgo] review commit 8800eba [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 32ac7af [witgo] review commit 056b8c7 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 04c6f7e [witgo] Merge branch 'master' into allocateExecutors aff827c [witgo] review commit 5c376e0 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 1faf4f4 [witgo] Merge branch 'master' into allocateExecutors 3c464bd [witgo] add time limit to allocateExecutors e00b656 [witgo] In some cases, yarn does not automatically restart the container --- .../spark/deploy/yarn/ApplicationMaster.scala | 39 ++++++++++++---------- .../spark/deploy/yarn/ExecutorLauncher.scala | 22 ++++++------ 2 files changed, 34 insertions(+), 27 deletions(-) (limited to 'yarn') diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c1dfe3f53b..33a60d978c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -252,15 +252,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, try { logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished - // TODO: This is a bit ugly. Can we make it nicer? - // TODO: Handle container failure yarnAllocator.addResourceRequests(args.numExecutors) + yarnAllocator.allocateResources() // Exits the loop if the user thread exits. while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - } + checkNumExecutorsFailed() + allocateMissingExecutor() yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) @@ -289,23 +286,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } + private def allocateMissingExecutor() { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - + yarnAllocator.getNumPendingAllocate + if (missingExecutorCount > 0) { + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) + } + } + + private def checkNumExecutorsFailed() { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of executor failures reached") + } + } + private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { while (userThread.isAlive) { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - } - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } + checkNumExecutorsFailed() + allocateMissingExecutor() sendProgress() Thread.sleep(sleepTime) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a4ce8766d3..d93e5bb022 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -200,17 +200,25 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished - // TODO: This is a bit ugly. Can we make it nicer? - // TODO: Handle container failure - yarnAllocator.addResourceRequests(args.numExecutors) + yarnAllocator.allocateResources() while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { + allocateMissingExecutor() yarnAllocator.allocateResources() Thread.sleep(100) } logInfo("All executors have launched.") + } + private def allocateMissingExecutor() { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - + yarnAllocator.getNumPendingAllocate + if (missingExecutorCount > 0) { + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) + } } // TODO: We might want to extend this to allocate more containers in case they die ! @@ -220,13 +228,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val t = new Thread { override def run() { while (!driverClosed) { - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } + allocateMissingExecutor() sendProgress() Thread.sleep(sleepTime) } -- cgit v1.2.3