diff options
author | BlackNiuza <shiyun.wxm@taobao.com> | 2013-07-09 15:18:28 +0800 |
---|---|---|
committer | BlackNiuza <shiyun.wxm@taobao.com> | 2013-07-09 15:18:28 +0800 |
commit | c1d44be80580f0fad6bb1805bbcf74a34f536d8c (patch) | |
tree | 671a8f0a23231ecf3cef554d505099940674f463 /core | |
parent | 3c1317835e8100e3d8b2f0883ee66c81a2300652 (diff) | |
download | spark-c1d44be80580f0fad6bb1805bbcf74a34f536d8c.tar.gz spark-c1d44be80580f0fad6bb1805bbcf74a34f536d8c.tar.bz2 spark-c1d44be80580f0fad6bb1805bbcf74a34f536d8c.zip |
Bug fix: SPARK-796
Diffstat (limited to 'core')
-rw-r--r-- | core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala | 49 |
1 files changed, 32 insertions, 17 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala index f19648ec68..9bc692d480 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = null + private var isFinished:Boolean = false def run() { @@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Wait for the user class to Finish userThread.join() - - // Finish the ApplicationMaster - finishApplicationMaster() - // TODO: Exit based on success/failure + System.exit(0) } @@ -131,10 +129,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e .getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { - // Copy - var mainArgs: Array[String] = new Array[String](args.userArgs.size()) - args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) - mainMethod.invoke(null, mainArgs) + try{ + // Copy + var mainArgs: Array[String] = new Array[String](args.userArgs.size()) + args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) + mainMethod.invoke(null, mainArgs) + ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) + } catch { + case th: Throwable => + ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED) + logError("Finish ApplicationMaster with ",th) + } } } t.start() @@ -235,14 +240,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } */ - - def finishApplicationMaster() { - val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) - .asInstanceOf[FinishApplicationMasterRequest] - finishReq.setAppAttemptId(appAttemptId) - // TODO: Check if the application has failed or succeeded - finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED) - resourceManager.finishApplicationMaster(finishReq) + + def finishApplicationMaster(status: FinalApplicationStatus) { + + synchronized { + if(isFinished){ + return + } + isFinished = true + + logInfo("finishApplicationMaster with "+status) + val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) + .asInstanceOf[FinishApplicationMasterRequest] + finishReq.setAppAttemptId(appAttemptId) + finishReq.setFinishApplicationStatus(status) + resourceManager.finishApplicationMaster(finishReq) + } } } @@ -291,7 +304,9 @@ object ApplicationMaster { logInfo("Invoking sc stop from shutdown hook") sc.stop() // best case ... - for (master <- applicationMasters) master.finishApplicationMaster + for (master <- applicationMasters) { + master.finishApplicationMaster(FinalApplicationStatus.KILLED) + } } } ) } |