From b2047b55c5fc85de6b63276d8ab9610d2496e08b Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 2 Feb 2015 12:27:08 -0800 Subject: SPARK-4585. Spark dynamic executor allocation should use minExecutors as... ... initial number Author: Sandy Ryza Closes #4051 from sryza/sandy-spark-4585 and squashes the following commits: d1dd039 [Sandy Ryza] Add spark.dynamicAllocation.initialNumExecutors and make min and max not required b7c59dc [Sandy Ryza] SPARK-4585. Spark dynamic executor allocation should use minExecutors as initial number --- .../org/apache/spark/ExecutorAllocationManager.scala | 14 ++++++++------ .../spark/ExecutorAllocationManagerSuite.scala | 15 +++++++-------- docs/configuration.md | 20 ++++++++++++++------ docs/job-scheduling.md | 9 ++++----- .../apache/spark/deploy/yarn/ClientArguments.scala | 17 +++++++++++++---- 5 files changed, 46 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b28da192c1..5d5288bb6e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -49,6 +49,7 @@ import org.apache.spark.scheduler._ * spark.dynamicAllocation.enabled - Whether this feature is enabled * spark.dynamicAllocation.minExecutors - Lower bound on the number of executors * spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors + * spark.dynamicAllocation.initialExecutors - Number of executors to start with * * spark.dynamicAllocation.schedulerBacklogTimeout (M) - * If there are backlogged tasks for this duration, add new executors @@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager( import ExecutorAllocationManager._ - // Lower and upper bounds on the number of executors. These are required. - private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1) - private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1) + // Lower and upper bounds on the number of executors. + private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) + private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", + Integer.MAX_VALUE) // How long there must be backlogged tasks for before an addition is triggered private val schedulerBacklogTimeout = conf.getLong( @@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager( */ private def validateSettings(): Unit = { if (minNumExecutors < 0 || maxNumExecutors < 0) { - throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!") + throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!") } - if (minNumExecutors == 0 || maxNumExecutors == 0) { - throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!") + if (maxNumExecutors == 0) { + throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!") } if (minNumExecutors > maxNumExecutors) { throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " + diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 0e4df17c1b..57081ddd95 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -32,24 +32,23 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { import ExecutorAllocationManagerSuite._ test("verify min/max executors") { - // No min or max val conf = new SparkConf() .setMaster("local") .setAppName("test-executor-allocation-manager") .set("spark.dynamicAllocation.enabled", "true") .set("spark.dynamicAllocation.testing", "true") - intercept[SparkException] { new SparkContext(conf) } - SparkEnv.get.stop() // cleanup the created environment - SparkContext.clearActiveContext() + val sc0 = new SparkContext(conf) + assert(sc0.executorAllocationManager.isDefined) + sc0.stop() - // Only min - val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1") + // Min < 0 + val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1") intercept[SparkException] { new SparkContext(conf1) } SparkEnv.get.stop() SparkContext.clearActiveContext() - // Only max - val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2") + // Max < 0 + val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1") intercept[SparkException] { new SparkContext(conf2) } SparkEnv.get.stop() SparkContext.clearActiveContext() diff --git a/docs/configuration.md b/docs/configuration.md index e4e4b8d516..08c6befaf3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1098,24 +1098,32 @@ Apart from these, the following properties are also available, and may be useful available on YARN mode. For more detail, see the description here.

- This requires the following configurations to be set: + This requires spark.shuffle.service.enabled to be set. + The following configurations are also relevant: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, and - spark.shuffle.service.enabled + spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors - (none) + 0 - Lower bound for the number of executors if dynamic allocation is enabled (required). + Lower bound for the number of executors if dynamic allocation is enabled. spark.dynamicAllocation.maxExecutors - (none) + Integer.MAX_VALUE + + Upper bound for the number of executors if dynamic allocation is enabled. + + + + spark.dynamicAllocation.maxExecutors + spark.dynamicAllocation.minExecutors - Upper bound for the number of executors if dynamic allocation is enabled (required). + Initial number of executors to run if dynamic allocation is enabled. diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index a5425eb355..5295e351dd 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -77,11 +77,10 @@ scheduling while sharing cluster resources efficiently. ### Configuration and Setup All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace. -To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and -provide lower and upper bounds for the number of executors through -`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant -configurations are described on the [configurations page](configuration.html#dynamic-allocation) -and in the subsequent sections in detail. +To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true`. +Other relevant configurations are described on the +[configurations page](configuration.html#dynamic-allocation) and in the subsequent sections in +detail. Additionally, your application must use an external shuffle service. The purpose of the service is to preserve the shuffle files written by executors so the executors can be safely removed (more diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index f96b245512..5eb2023802 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -75,14 +75,23 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p))) .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) .orNull - // If dynamic allocation is enabled, start at the max number of executors + // If dynamic allocation is enabled, start at the configured initial number of executors. + // Default to minExecutors if no initialExecutors is set. if (isDynamicAllocationEnabled) { + val minExecutorsConf = "spark.dynamicAllocation.minExecutors" + val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors" val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors" - if (!sparkConf.contains(maxExecutorsConf)) { + val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0) + val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors) + val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE) + + // If defined, initial executors must be between min and max + if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) { throw new IllegalArgumentException( - s"$maxExecutorsConf must be set if dynamic allocation is enabled!") + s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!") } - numExecutors = sparkConf.get(maxExecutorsConf).toInt + + numExecutors = initialNumExecutors } } -- cgit v1.2.3