aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-02-02 12:27:08 -0800
committerAndrew Or <andrew@databricks.com>2015-02-02 12:27:08 -0800
commitb2047b55c5fc85de6b63276d8ab9610d2496e08b (patch)
treeb62017de84f2893af77a00714988e72b75c257a8 /yarn/src
parentc081b21b1fe4fbad845088c4144da0bd2a8d89dc (diff)
downloadspark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.tar.gz
spark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.tar.bz2
spark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.zip
SPARK-4585. Spark dynamic executor allocation should use minExecutors as...
... initial number Author: Sandy Ryza <sandy@cloudera.com> Closes #4051 from sryza/sandy-spark-4585 and squashes the following commits: d1dd039 [Sandy Ryza] Add spark.dynamicAllocation.initialNumExecutors and make min and max not required b7c59dc [Sandy Ryza] SPARK-4585. Spark dynamic executor allocation should use minExecutors as initial number
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala17
1 files changed, 13 insertions, 4 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index f96b245512..5eb2023802 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -75,14 +75,23 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orNull
- // If dynamic allocation is enabled, start at the max number of executors
+ // If dynamic allocation is enabled, start at the configured initial number of executors.
+ // Default to minExecutors if no initialExecutors is set.
if (isDynamicAllocationEnabled) {
+ val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
+ val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
- if (!sparkConf.contains(maxExecutorsConf)) {
+ val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
+ val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
+ val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)
+
+ // If defined, initial executors must be between min and max
+ if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
throw new IllegalArgumentException(
- s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+ s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
}
- numExecutors = sparkConf.get(maxExecutorsConf).toInt
+
+ numExecutors = initialNumExecutors
}
}