aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBlackNiuza <shiyun.wxm@taobao.com>2013-07-09 15:18:28 +0800
committerBlackNiuza <shiyun.wxm@taobao.com>2013-07-09 15:18:28 +0800
commitc1d44be80580f0fad6bb1805bbcf74a34f536d8c (patch)
tree671a8f0a23231ecf3cef554d505099940674f463 /core
parent3c1317835e8100e3d8b2f0883ee66c81a2300652 (diff)
downloadspark-c1d44be80580f0fad6bb1805bbcf74a34f536d8c.tar.gz
spark-c1d44be80580f0fad6bb1805bbcf74a34f536d8c.tar.bz2
spark-c1d44be80580f0fad6bb1805bbcf74a34f536d8c.zip
Bug fix: SPARK-796
Diffstat (limited to 'core')
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala49
1 files changed, 32 insertions, 17 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index f19648ec68..9bc692d480 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null
+ private var isFinished:Boolean = false
def run() {
@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait for the user class to Finish
userThread.join()
-
- // Finish the ApplicationMaster
- finishApplicationMaster()
- // TODO: Exit based on success/failure
+
System.exit(0)
}
@@ -131,10 +129,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
- // Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size())
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
- mainMethod.invoke(null, mainArgs)
+ try{
+ // Copy
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size())
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+ mainMethod.invoke(null, mainArgs)
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ } catch {
+ case th: Throwable =>
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+ logError("Finish ApplicationMaster with ",th)
+ }
}
}
t.start()
@@ -235,14 +240,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
*/
-
- def finishApplicationMaster() {
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- // TODO: Check if the application has failed or succeeded
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED)
- resourceManager.finishApplicationMaster(finishReq)
+
+ def finishApplicationMaster(status: FinalApplicationStatus) {
+
+ synchronized {
+ if(isFinished){
+ return
+ }
+ isFinished = true
+
+ logInfo("finishApplicationMaster with "+status)
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
}
}
@@ -291,7 +304,9 @@ object ApplicationMaster {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
// best case ...
- for (master <- applicationMasters) master.finishApplicationMaster
+ for (master <- applicationMasters) {
+ master.finishApplicationMaster(FinalApplicationStatus.KILLED)
+ }
}
} )
}