diff options
-rw-r--r-- | core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala | 31 |
1 files changed, 19 insertions, 12 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala index 9bc692d480..776db201f9 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -122,23 +122,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } } - + private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader) .getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { + var successed = false try{ // Copy var mainArgs: Array[String] = new Array[String](args.userArgs.size()) args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) mainMethod.invoke(null, mainArgs) - ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) - } catch { - case th: Throwable => + successed = true + } finally { + if(successed){ + ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) + }else{ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED) - logError("Finish ApplicationMaster with ",th) + } } } } @@ -248,14 +251,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e return } isFinished = true - - logInfo("finishApplicationMaster with "+status) - val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) - .asInstanceOf[FinishApplicationMasterRequest] - finishReq.setAppAttemptId(appAttemptId) - finishReq.setFinishApplicationStatus(status) - resourceManager.finishApplicationMaster(finishReq) } + + logInfo("finishApplicationMaster with " + status) + val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) + .asInstanceOf[FinishApplicationMasterRequest] + finishReq.setAppAttemptId(appAttemptId) + finishReq.setFinishApplicationStatus(status) + resourceManager.finishApplicationMaster(finishReq) + } } @@ -304,6 +308,9 @@ object ApplicationMaster { logInfo("Invoking sc stop from shutdown hook") sc.stop() // best case ... + // due to the sparkContext is stopped and ApplicationMaster is down, + // the status of registered masters should be set KILLED better than FAILED. + // need discussion for (master <- applicationMasters) { master.finishApplicationMaster(FinalApplicationStatus.KILLED) } |