aboutsummaryrefslogtreecommitdiff
path: root/yarn/stable
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-08-19 09:40:31 -0500
committerThomas Graves <tgraves@apache.org>2014-08-19 09:40:31 -0500
commit7eb9cbc273d758522e787fcb2ef68ef65911475f (patch)
treebaea2c9bfa0b5afd89978971e00ec5587c844b09 /yarn/stable
parentcd0720ca77894d481fb73a8b5bb517013843cb1e (diff)
downloadspark-7eb9cbc273d758522e787fcb2ef68ef65911475f.tar.gz
spark-7eb9cbc273d758522e787fcb2ef68ef65911475f.tar.bz2
spark-7eb9cbc273d758522e787fcb2ef68ef65911475f.zip
[SPARK-3072] YARN - Exit when reach max number failed executors
In some cases on hadoop 2.x the spark application master doesn't properly exit and hangs around for 10 minutes after its really done. We should make sure it exits properly and stops the driver. Author: Thomas Graves <tgraves@apache.org> Closes #2022 from tgravescs/SPARK-3072 and squashes the following commits: 665701d [Thomas Graves] Exit when reach max number failed executors
Diffstat (limited to 'yarn/stable')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala16
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala5
2 files changed, 15 insertions, 6 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 035356d390..9c2bcf17a8 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -247,13 +247,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
yarnAllocator.allocateResources()
// Exits the loop if the user thread exits.
- var iters = 0
- while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+ while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
+ && !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
- iters += 1
}
}
logInfo("All executors have launched.")
@@ -271,8 +270,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ logInfo("max number of executor failures reached")
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
+ // make sure to stop the user thread
+ val sparkContext = ApplicationMaster.sparkContextRef.get()
+ if (sparkContext != null) {
+ logInfo("Invoking sc stop from checkNumExecutorsFailed")
+ sparkContext.stop()
+ } else {
+ logError("sparkContext is null when should shutdown")
+ }
}
}
@@ -289,7 +297,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
- while (userThread.isAlive) {
+ while (userThread.isAlive && !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index fc7b8320d7..a7585748b7 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -217,7 +217,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Wait until all containers have launched
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
- while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+ while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
+ !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
@@ -249,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
- while (!driverClosed) {
+ while (!driverClosed && !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")