aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorWangTaoTheTonic <barneystinson@aliyun.com>2015-01-07 08:14:39 -0600
committerThomas Graves <tgraves@apache.org>2015-01-07 08:14:39 -0600
commit8fdd48959c93b9cf809f03549e2ae6c4687d1fcd (patch)
tree2e20bce18b10a53ca05b99e42ecab3ff19c1bd29 /yarn
parent5fde66163fe460d6f64b145047f76cc4ee33601a (diff)
downloadspark-8fdd48959c93b9cf809f03549e2ae6c4687d1fcd.tar.gz
spark-8fdd48959c93b9cf809f03549e2ae6c4687d1fcd.tar.bz2
spark-8fdd48959c93b9cf809f03549e2ae6c4687d1fcd.zip
[SPARK-2165][YARN]add support for setting maxAppAttempts in the ApplicationSubmissionContext
...xt https://issues.apache.org/jira/browse/SPARK-2165 I still have 2 questions: * If this config is not set, we should use yarn's corresponding value or a default value(like 2) on spark side? * Is the config name best? Or "spark.yarn.am.maxAttempts"? Author: WangTaoTheTonic <barneystinson@aliyun.com> Closes #3878 from WangTaoTheTonic/SPARK-2165 and squashes the following commits: 1416c83 [WangTaoTheTonic] use the name spark.yarn.maxAppAttempts 202ac85 [WangTaoTheTonic] rephrase some afdfc99 [WangTaoTheTonic] more detailed description 91562c6 [WangTaoTheTonic] add support for setting maxAppAttempts in the ApplicationSubmissionContext
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala7
3 files changed, 11 insertions, 3 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 618db7f908..902bdda598 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
}
- val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+ val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (!finished) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index addaddb711..a2c3f918a1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -98,6 +98,11 @@ private[spark] class Client(
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
+ sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
+ case Some(v) => appContext.setMaxAppAttempts(v)
+ case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
+ "Cluster's default value will be used.")
+ }
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
appContext.setResource(capability)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index bf4e15908b..e183efccbb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
}
/** Returns the maximum number of attempts to register the AM. */
- def getMaxRegAttempts(conf: YarnConfiguration): Int =
- conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+ def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
+ sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt).getOrElse(
+ yarnConf.getInt(
+ YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS))
+ }
}