aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorwitgo <witgo@qq.com>2014-06-10 10:34:57 -0500
committerThomas Graves <tgraves@apache.org>2014-06-10 11:32:05 -0500
commit35894af8cc66056352a1e4c7ebff1b6ecb12b7b9 (patch)
treeebf3c60ca26195f2a26caa13b8fe06361c3c5141 /yarn
parent89caa40e360573288cbc4275c02f6394d081c129 (diff)
downloadspark-35894af8cc66056352a1e4c7ebff1b6ecb12b7b9.tar.gz
spark-35894af8cc66056352a1e4c7ebff1b6ecb12b7b9.tar.bz2
spark-35894af8cc66056352a1e4c7ebff1b6ecb12b7b9.zip
[SPARK-1978] In some cases, spark-yarn does not automatically restart the failed container
Author: witgo <witgo@qq.com> Closes #921 from witgo/allocateExecutors and squashes the following commits: bc3aa66 [witgo] review commit 8800eba [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 32ac7af [witgo] review commit 056b8c7 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 04c6f7e [witgo] Merge branch 'master' into allocateExecutors aff827c [witgo] review commit 5c376e0 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 1faf4f4 [witgo] Merge branch 'master' into allocateExecutors 3c464bd [witgo] add time limit to allocateExecutors e00b656 [witgo] In some cases, yarn does not automatically restart the container
Diffstat (limited to 'yarn')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala39
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala22
2 files changed, 34 insertions, 27 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index c1dfe3f53b..33a60d978c 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -252,15 +252,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
try {
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numExecutors)
+ yarnAllocator.allocateResources()
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
+ checkNumExecutorsFailed()
+ allocateMissingExecutor()
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
@@ -289,23 +286,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
+ private def allocateMissingExecutor() {
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
+ yarnAllocator.getNumPendingAllocate
+ if (missingExecutorCount > 0) {
+ logInfo("Allocating %d containers to make up for (potentially) lost containers".
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
+ }
+ }
+
+ private def checkNumExecutorsFailed() {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of executor failures reached")
+ }
+ }
+
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
while (userThread.isAlive) {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
- yarnAllocator.getNumPendingAllocate
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.addResourceRequests(missingExecutorCount)
- }
+ checkNumExecutorsFailed()
+ allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a4ce8766d3..d93e5bb022 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -200,17 +200,25 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
-
yarnAllocator.addResourceRequests(args.numExecutors)
+ yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+ allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(100)
}
logInfo("All executors have launched.")
+ }
+ private def allocateMissingExecutor() {
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
+ yarnAllocator.getNumPendingAllocate
+ if (missingExecutorCount > 0) {
+ logInfo("Allocating %d containers to make up for (potentially) lost containers".
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
+ }
}
// TODO: We might want to extend this to allocate more containers in case they die !
@@ -220,13 +228,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
- yarnAllocator.getNumPendingAllocate
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.addResourceRequests(missingExecutorCount)
- }
+ allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}