aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala34
1 files changed, 22 insertions, 12 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2e46d750c4..560e5de358 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -65,6 +65,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))
+ private var registered = false
+
def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
@@ -110,7 +112,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
waitForSparkContextInitialized()
// Do this after spark master is up and SparkContext is created so that we can register UI Url
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ synchronized {
+ if (!isFinished) {
+ registerApplicationMaster()
+ registered = true
+ }
+ }
// Allocate all containers
allocateWorkers()
@@ -208,7 +215,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
var count = 0
val waitTime = 10000L
val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
- while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
+ && !isFinished) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
ApplicationMaster.sparkContextRef.wait(waitTime)
@@ -341,17 +349,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true
+
+ logInfo("finishApplicationMaster with " + status)
+ if (registered) {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
+ // Set tracking url to empty since we don't have a history server.
+ finishReq.setTrackingUrl("")
+ resourceManager.finishApplicationMaster(finishReq)
+ }
}
-
- logInfo("finishApplicationMaster with " + status)
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setDiagnostics(diagnostics)
- // Set tracking url to empty since we don't have a history server.
- finishReq.setTrackingUrl("")
- resourceManager.finishApplicationMaster(finishReq)
}
/**