aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala33
1 files changed, 22 insertions, 11 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 62b5c3bc5f..46a01f5a9a 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -267,12 +267,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- // Exists the loop if the user thread exits.
- while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
+ // Exits the loop if the user thread exits.
+ while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
+ && !isFinished) {
+ checkNumExecutorsFailed()
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
@@ -303,11 +301,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
- while (userThread.isAlive) {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
+ while (userThread.isAlive && !isFinished) {
+ checkNumExecutorsFailed()
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
@@ -327,6 +322,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
+ private def checkNumExecutorsFailed() {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ logInfo("max number of executor failures reached")
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of executor failures reached")
+ // make sure to stop the user thread
+ val sparkContext = ApplicationMaster.sparkContextRef.get()
+ if (sparkContext != null) {
+ logInfo("Invoking sc stop from checkNumExecutorsFailed")
+ sparkContext.stop()
+ } else {
+ logError("sparkContext is null when should shutdown")
+ }
+ }
+ }
+
private def sendProgress() {
logDebug("Sending progress")
// Simulated with an allocate request with no nodes requested ...