aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBlackNiuza <shiyun.wxm@taobao.com>2013-07-09 20:03:01 +0800
committerBlackNiuza <shiyun.wxm@taobao.com>2013-07-09 20:03:01 +0800
commitaaa7b081df760a29ce5cdcd51d6b71422cba68d5 (patch)
tree44e1ae99a8966c4413cdacded19806c3c9d73964 /core
parentc1d44be80580f0fad6bb1805bbcf74a34f536d8c (diff)
downloadspark-aaa7b081df760a29ce5cdcd51d6b71422cba68d5.tar.gz
spark-aaa7b081df760a29ce5cdcd51d6b71422cba68d5.tar.bz2
spark-aaa7b081df760a29ce5cdcd51d6b71422cba68d5.zip
according to mridulm's comments to adjust the code
Diffstat (limited to 'core')
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala31
1 files changed, 19 insertions, 12 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index 9bc692d480..776db201f9 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -122,23 +122,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
}
-
+
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
.getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
+ var successed = false
try{
// Copy
var mainArgs: Array[String] = new Array[String](args.userArgs.size())
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
mainMethod.invoke(null, mainArgs)
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- } catch {
- case th: Throwable =>
+ successed = true
+ } finally {
+ if(successed){
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ }else{
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
- logError("Finish ApplicationMaster with ",th)
+ }
}
}
}
@@ -248,14 +251,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
return
}
isFinished = true
-
- logInfo("finishApplicationMaster with "+status)
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- resourceManager.finishApplicationMaster(finishReq)
}
+
+ logInfo("finishApplicationMaster with " + status)
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ resourceManager.finishApplicationMaster(finishReq)
+
}
}
@@ -304,6 +308,9 @@ object ApplicationMaster {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
// best case ...
+ // due to the sparkContext is stopped and ApplicationMaster is down,
+ // the status of registered masters should be set KILLED better than FAILED.
+ // need discussion
for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.KILLED)
}