aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-10-07 09:51:37 -0500
committerThomas Graves <tgraves@apache.org>2014-10-07 09:51:37 -0500
commit70e824f750aa8ed446eec104ba158b0503ba58a9 (patch)
treee58b05f03b1a42eb845c70025bba4fb17495daef /yarn/alpha
parent69c3f441a9b6e942d6c08afecd59a0349d61cc7b (diff)
downloadspark-70e824f750aa8ed446eec104ba158b0503ba58a9.tar.gz
spark-70e824f750aa8ed446eec104ba158b0503ba58a9.tar.bz2
spark-70e824f750aa8ed446eec104ba158b0503ba58a9.zip
[SPARK-3627] - [yarn] - fix exit code and final status reporting to RM
See the description and whats handled in the jira comment: https://issues.apache.org/jira/browse/SPARK-3627?focusedCommentId=14150013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14150013 This does not handle yarn client mode reporting of the driver to the AM. I think that should be handled when we make it an unmanaged AM. Author: Thomas Graves <tgraves@apache.org> Closes #2577 from tgravescs/SPARK-3627 and squashes the following commits: 9c2efbf [Thomas Graves] review comments e8cc261 [Thomas Graves] fix accidental typo during fixing comment 24c98e3 [Thomas Graves] rework 85f1901 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into SPARK-3627 fab166d [Thomas Graves] update based on review comments 32f4dfa [Thomas Graves] switch back f0b6519 [Thomas Graves] change order of cleanup staging dir d3cc800 [Thomas Graves] SPARK-3627 - yarn - fix exit code and final status reporting to RM
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala26
1 files changed, 16 insertions, 10 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index 9bd1719cb1..7faf55bc63 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
private var rpc: YarnRPC = null
private var resourceManager: AMRMProtocol = _
private var uiHistoryAddress: String = _
+ private var registered: Boolean = false
override def register(
conf: YarnConfiguration,
@@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
this.rpc = YarnRPC.create(conf)
this.uiHistoryAddress = uiHistoryAddress
- resourceManager = registerWithResourceManager(conf)
- registerApplicationMaster(uiAddress)
+ synchronized {
+ resourceManager = registerWithResourceManager(conf)
+ registerApplicationMaster(uiAddress)
+ registered = true
+ }
new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
preferredNodeLocations, securityMgr)
@@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
appAttemptId
}
- override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(getAttemptId())
- finishReq.setFinishApplicationStatus(status)
- finishReq.setDiagnostics(diagnostics)
- finishReq.setTrackingUrl(uiHistoryAddress)
- resourceManager.finishApplicationMaster(finishReq)
+ override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
+ if (registered) {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(getAttemptId())
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
+ finishReq.setTrackingUrl(uiHistoryAddress)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
}
override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {