diff options
author | Sandy Ryza <sandy@cloudera.com> | 2015-02-02 12:27:08 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-02-02 12:27:08 -0800 |
commit | b2047b55c5fc85de6b63276d8ab9610d2496e08b (patch) | |
tree | b62017de84f2893af77a00714988e72b75c257a8 /yarn/src/main | |
parent | c081b21b1fe4fbad845088c4144da0bd2a8d89dc (diff) | |
download | spark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.tar.gz spark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.tar.bz2 spark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.zip |
SPARK-4585. Spark dynamic executor allocation should use minExecutors as...
... initial number
Author: Sandy Ryza <sandy@cloudera.com>
Closes #4051 from sryza/sandy-spark-4585 and squashes the following commits:
d1dd039 [Sandy Ryza] Add spark.dynamicAllocation.initialNumExecutors and make min and max not required
b7c59dc [Sandy Ryza] SPARK-4585. Spark dynamic executor allocation should use minExecutors as initial number
Diffstat (limited to 'yarn/src/main')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index f96b245512..5eb2023802 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -75,14 +75,23 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p))) .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) .orNull - // If dynamic allocation is enabled, start at the max number of executors + // If dynamic allocation is enabled, start at the configured initial number of executors. + // Default to minExecutors if no initialExecutors is set. if (isDynamicAllocationEnabled) { + val minExecutorsConf = "spark.dynamicAllocation.minExecutors" + val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors" val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors" - if (!sparkConf.contains(maxExecutorsConf)) { + val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0) + val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors) + val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE) + + // If defined, initial executors must be between min and max + if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) { throw new IllegalArgumentException( - s"$maxExecutorsConf must be set if dynamic allocation is enabled!") + s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!") } - numExecutors = sparkConf.get(maxExecutorsConf).toInt + + numExecutors = initialNumExecutors } } |