aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-02-28 09:40:47 -0600
committerThomas Graves <tgraves@apache.org>2014-02-28 09:40:47 -0600
commit5f419bf9f433e8f057237f1d5bfed9f5f4e9427c (patch)
tree796cda45fed79d96e6364a42880f4abd1f08e430 /yarn/alpha
parentedf8a56ab7eaee1f7c3b4579eb10464984d31d7a (diff)
downloadspark-5f419bf9f433e8f057237f1d5bfed9f5f4e9427c.tar.gz
spark-5f419bf9f433e8f057237f1d5bfed9f5f4e9427c.tar.bz2
spark-5f419bf9f433e8f057237f1d5bfed9f5f4e9427c.zip
SPARK-1032. If Yarn app fails before registering, app master stays aroun...
...d long after This reopens https://github.com/apache/incubator-spark/pull/648 against the new repo. Author: Sandy Ryza <sandy@cloudera.com> Closes #28 from sryza/sandy-spark-1032 and squashes the following commits: 5953f50 [Sandy Ryza] SPARK-1032. If Yarn app fails before registering, app master stays around long after
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala34
1 files changed, 22 insertions, 12 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2e46d750c4..560e5de358 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -65,6 +65,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))
+ private var registered = false
+
def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
@@ -110,7 +112,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
waitForSparkContextInitialized()
// Do this after spark master is up and SparkContext is created so that we can register UI Url
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ synchronized {
+ if (!isFinished) {
+ registerApplicationMaster()
+ registered = true
+ }
+ }
// Allocate all containers
allocateWorkers()
@@ -208,7 +215,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
var count = 0
val waitTime = 10000L
val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
- while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
+ && !isFinished) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
ApplicationMaster.sparkContextRef.wait(waitTime)
@@ -341,17 +349,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true
+
+ logInfo("finishApplicationMaster with " + status)
+ if (registered) {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
+ // Set tracking url to empty since we don't have a history server.
+ finishReq.setTrackingUrl("")
+ resourceManager.finishApplicationMaster(finishReq)
+ }
}
-
- logInfo("finishApplicationMaster with " + status)
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setDiagnostics(diagnostics)
- // Set tracking url to empty since we don't have a history server.
- finishReq.setTrackingUrl("")
- resourceManager.finishApplicationMaster(finishReq)
}
/**