From 8fdd48959c93b9cf809f03549e2ae6c4687d1fcd Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 7 Jan 2015 08:14:39 -0600 Subject: [SPARK-2165][YARN]add support for setting maxAppAttempts in the ApplicationSubmissionContext ...xt https://issues.apache.org/jira/browse/SPARK-2165 I still have 2 questions: * If this config is not set, we should use yarn's corresponding value or a default value(like 2) on spark side? * Is the config name best? Or "spark.yarn.am.maxAttempts"? Author: WangTaoTheTonic Closes #3878 from WangTaoTheTonic/SPARK-2165 and squashes the following commits: 1416c83 [WangTaoTheTonic] use the name spark.yarn.maxAppAttempts 202ac85 [WangTaoTheTonic] rephrase some afdfc99 [WangTaoTheTonic] more detailed description 91562c6 [WangTaoTheTonic] add support for setting maxAppAttempts in the ApplicationSubmissionContext --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 +++++ .../src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala | 7 +++++-- 3 files changed, 11 insertions(+), 3 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 618db7f908..902bdda598 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, logInfo("Invoking sc stop from shutdown hook") sc.stop() } - val maxAppAttempts = client.getMaxRegAttempts(yarnConf) + val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts if (!finished) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index addaddb711..a2c3f918a1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -98,6 +98,11 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType("SPARK") + sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match { + case Some(v) => appContext.setMaxAppAttempts(v) + case None => logDebug("spark.yarn.maxAppAttempts is not set. " + + "Cluster's default value will be used.") + } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) appContext.setResource(capability) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index bf4e15908b..e183efccbb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg } /** Returns the maximum number of attempts to register the AM. */ - def getMaxRegAttempts(conf: YarnConfiguration): Int = - conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) + def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = { + sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt).getOrElse( + yarnConf.getInt( + YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)) + } } -- cgit v1.2.3