aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala17
1 files changed, 13 insertions, 4 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index f96b245512..5eb2023802 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -75,14 +75,23 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orNull
- // If dynamic allocation is enabled, start at the max number of executors
+ // If dynamic allocation is enabled, start at the configured initial number of executors.
+ // Default to minExecutors if no initialExecutors is set.
if (isDynamicAllocationEnabled) {
+ val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
+ val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
- if (!sparkConf.contains(maxExecutorsConf)) {
+ val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
+ val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
+ val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)
+
+ // If defined, initial executors must be between min and max
+ if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
throw new IllegalArgumentException(
- s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+ s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
}
- numExecutors = sparkConf.get(maxExecutorsConf).toInt
+
+ numExecutors = initialNumExecutors
}
}