aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-07-14 12:58:13 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-07-14 12:58:13 -0700
commitc7877d5e16e5441a9891d5eda67a8a062278b103 (patch)
treecfa08964badf3234507722b04b97f54ec5e2e855
parent10c05937bdf59abee2b9c6c3ee45ea747c5f6ee4 (diff)
parent00556a94c98577d1536d0c2b4606a9933069a584 (diff)
downloadspark-c7877d5e16e5441a9891d5eda67a8a062278b103.tar.gz
spark-c7877d5e16e5441a9891d5eda67a8a062278b103.tar.bz2
spark-c7877d5e16e5441a9891d5eda67a8a062278b103.zip
Merge pull request #689 from BlackNiuza/application_status
Bug fix: SPARK-796
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala58
1 files changed, 40 insertions, 18 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index f19648ec68..6a0617cc06 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null
+ private var isFinished:Boolean = false
def run() {
@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait for the user class to Finish
userThread.join()
-
- // Finish the ApplicationMaster
- finishApplicationMaster()
- // TODO: Exit based on success/failure
+
System.exit(0)
}
@@ -124,17 +122,30 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
}
-
+
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
.getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
- // Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size())
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
- mainMethod.invoke(null, mainArgs)
+ var successed = false
+ try {
+ // Copy
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size())
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+ mainMethod.invoke(null, mainArgs)
+ // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+ // userThread will stop here unless it has uncaught exception thrown out
+ // It need shutdown hook to set SUCCEEDED
+ successed = true
+ } finally {
+ if (successed) {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ } else {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+ }
+ }
}
}
t.start()
@@ -179,7 +190,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("All workers have launched.")
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
- if (userThread.isAlive){
+ if (userThread.isAlive) {
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
@@ -197,7 +208,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val t = new Thread {
override def run() {
- while (userThread.isAlive){
+ while (userThread.isAlive) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
@@ -235,14 +246,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
*/
-
- def finishApplicationMaster() {
+
+ def finishApplicationMaster(status: FinalApplicationStatus) {
+
+ synchronized {
+ if (isFinished) {
+ return
+ }
+ isFinished = true
+ }
+
+ logInfo("finishApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
- // TODO: Check if the application has failed or succeeded
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED)
+ finishReq.setFinishApplicationStatus(status)
resourceManager.finishApplicationMaster(finishReq)
+
}
}
@@ -256,7 +276,7 @@ object ApplicationMaster {
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
- if (count >= ALLOCATOR_LOOP_WAIT_COUNT){
+ if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.synchronized {
// to wake threads off wait ...
yarnAllocatorLoop.notifyAll()
@@ -291,14 +311,16 @@ object ApplicationMaster {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
// best case ...
- for (master <- applicationMasters) master.finishApplicationMaster
+ for (master <- applicationMasters) {
+ master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ }
}
} )
}
// Wait for initialization to complete and atleast 'some' nodes can get allocated
yarnAllocatorLoop.synchronized {
- while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT){
+ while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
}
}