diff options
author | Thomas Graves <tgraves@apache.org> | 2014-10-07 09:51:37 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-10-07 09:51:37 -0500 |
commit | 70e824f750aa8ed446eec104ba158b0503ba58a9 (patch) | |
tree | e58b05f03b1a42eb845c70025bba4fb17495daef /yarn/alpha | |
parent | 69c3f441a9b6e942d6c08afecd59a0349d61cc7b (diff) | |
download | spark-70e824f750aa8ed446eec104ba158b0503ba58a9.tar.gz spark-70e824f750aa8ed446eec104ba158b0503ba58a9.tar.bz2 spark-70e824f750aa8ed446eec104ba158b0503ba58a9.zip |
[SPARK-3627] - [yarn] - fix exit code and final status reporting to RM
See the description and whats handled in the jira comment: https://issues.apache.org/jira/browse/SPARK-3627?focusedCommentId=14150013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14150013
This does not handle yarn client mode reporting of the driver to the AM. I think that should be handled when we make it an unmanaged AM.
Author: Thomas Graves <tgraves@apache.org>
Closes #2577 from tgravescs/SPARK-3627 and squashes the following commits:
9c2efbf [Thomas Graves] review comments
e8cc261 [Thomas Graves] fix accidental typo during fixing comment
24c98e3 [Thomas Graves] rework
85f1901 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into SPARK-3627
fab166d [Thomas Graves] update based on review comments
32f4dfa [Thomas Graves] switch back
f0b6519 [Thomas Graves] change order of cleanup staging dir
d3cc800 [Thomas Graves] SPARK-3627 - yarn - fix exit code and final status reporting to RM
Diffstat (limited to 'yarn/alpha')
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala index 9bd1719cb1..7faf55bc63 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC private var rpc: YarnRPC = null private var resourceManager: AMRMProtocol = _ private var uiHistoryAddress: String = _ + private var registered: Boolean = false override def register( conf: YarnConfiguration, @@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC this.rpc = YarnRPC.create(conf) this.uiHistoryAddress = uiHistoryAddress - resourceManager = registerWithResourceManager(conf) - registerApplicationMaster(uiAddress) + synchronized { + resourceManager = registerWithResourceManager(conf) + registerApplicationMaster(uiAddress) + registered = true + } new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args, preferredNodeLocations, securityMgr) @@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC appAttemptId } - override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = { - val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) - .asInstanceOf[FinishApplicationMasterRequest] - finishReq.setAppAttemptId(getAttemptId()) - finishReq.setFinishApplicationStatus(status) - finishReq.setDiagnostics(diagnostics) - finishReq.setTrackingUrl(uiHistoryAddress) - resourceManager.finishApplicationMaster(finishReq) + override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized { + if (registered) { + val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) + .asInstanceOf[FinishApplicationMasterRequest] + finishReq.setAppAttemptId(getAttemptId()) + finishReq.setFinishApplicationStatus(status) + finishReq.setDiagnostics(diagnostics) + finishReq.setTrackingUrl(uiHistoryAddress) + resourceManager.finishApplicationMaster(finishReq) + } } override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = { |