diff options
-rw-r--r-- | yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 39 | ||||
-rw-r--r-- | yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 22 |
2 files changed, 34 insertions, 27 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c1dfe3f53b..33a60d978c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -252,15 +252,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, try { logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished - // TODO: This is a bit ugly. Can we make it nicer? - // TODO: Handle container failure yarnAllocator.addResourceRequests(args.numExecutors) + yarnAllocator.allocateResources() // Exits the loop if the user thread exits. while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - } + checkNumExecutorsFailed() + allocateMissingExecutor() yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) @@ -289,23 +286,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } + private def allocateMissingExecutor() { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - + yarnAllocator.getNumPendingAllocate + if (missingExecutorCount > 0) { + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) + } + } + + private def checkNumExecutorsFailed() { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of executor failures reached") + } + } + private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { while (userThread.isAlive) { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - } - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } + checkNumExecutorsFailed() + allocateMissingExecutor() sendProgress() Thread.sleep(sleepTime) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a4ce8766d3..d93e5bb022 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -200,17 +200,25 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished - // TODO: This is a bit ugly. Can we make it nicer? - // TODO: Handle container failure - yarnAllocator.addResourceRequests(args.numExecutors) + yarnAllocator.allocateResources() while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { + allocateMissingExecutor() yarnAllocator.allocateResources() Thread.sleep(100) } logInfo("All executors have launched.") + } + private def allocateMissingExecutor() { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - + yarnAllocator.getNumPendingAllocate + if (missingExecutorCount > 0) { + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) + } } // TODO: We might want to extend this to allocate more containers in case they die ! @@ -220,13 +228,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val t = new Thread { override def run() { while (!driverClosed) { - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } + allocateMissingExecutor() sendProgress() Thread.sleep(sleepTime) } |