aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/running-on-yarn.md8
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala7
4 files changed, 19 insertions, 3 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index da1c8e8aa8..183698ffe9 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -149,6 +149,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
In cluster mode, use spark.driver.extraJavaOptions instead.
</td>
</tr>
+<tr>
+ <td><code>spark.yarn.maxAppAttempts</code></td>
+ <td>yarn.resourcemanager.am.max-attempts in YARN</td>
+ <td>
+ The maximum number of attempts that will be made to submit the application.
+ It should be no larger than the global number of max attempts in the YARN configuration.
+ </td>
+</tr>
</table>
# Launching Spark on YARN
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 618db7f908..902bdda598 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
}
- val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+ val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (!finished) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index addaddb711..a2c3f918a1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -98,6 +98,11 @@ private[spark] class Client(
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
+ sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
+ case Some(v) => appContext.setMaxAppAttempts(v)
+ case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
+ "Cluster's default value will be used.")
+ }
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
appContext.setResource(capability)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index bf4e15908b..e183efccbb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
}
/** Returns the maximum number of attempts to register the AM. */
- def getMaxRegAttempts(conf: YarnConfiguration): Int =
- conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+ def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
+ sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt).getOrElse(
+ yarnConf.getInt(
+ YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS))
+ }
}