aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala15
-rw-r--r--docs/configuration.md20
-rw-r--r--docs/job-scheduling.md9
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala17
5 files changed, 46 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b28da192c1..5d5288bb6e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -49,6 +49,7 @@ import org.apache.spark.scheduler._
* spark.dynamicAllocation.enabled - Whether this feature is enabled
* spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
+ * spark.dynamicAllocation.initialExecutors - Number of executors to start with
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
* If there are backlogged tasks for this duration, add new executors
@@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager(
import ExecutorAllocationManager._
- // Lower and upper bounds on the number of executors. These are required.
- private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
- private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
+ // Lower and upper bounds on the number of executors.
+ private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
+ private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
+ Integer.MAX_VALUE)
// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
@@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager(
*/
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
- throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
+ throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!")
}
- if (minNumExecutors == 0 || maxNumExecutors == 0) {
- throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
+ if (maxNumExecutors == 0) {
+ throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!")
}
if (minNumExecutors > maxNumExecutors) {
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 0e4df17c1b..57081ddd95 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -32,24 +32,23 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
import ExecutorAllocationManagerSuite._
test("verify min/max executors") {
- // No min or max
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
- intercept[SparkException] { new SparkContext(conf) }
- SparkEnv.get.stop() // cleanup the created environment
- SparkContext.clearActiveContext()
+ val sc0 = new SparkContext(conf)
+ assert(sc0.executorAllocationManager.isDefined)
+ sc0.stop()
- // Only min
- val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
+ // Min < 0
+ val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
intercept[SparkException] { new SparkContext(conf1) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
- // Only max
- val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
+ // Max < 0
+ val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
intercept[SparkException] { new SparkContext(conf2) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
diff --git a/docs/configuration.md b/docs/configuration.md
index e4e4b8d516..08c6befaf3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1098,24 +1098,32 @@ Apart from these, the following properties are also available, and may be useful
available on YARN mode. For more detail, see the description
<a href="job-scheduling.html#dynamic-resource-allocation">here</a>.
<br><br>
- This requires the following configurations to be set:
+ This requires <code>spark.shuffle.service.enabled</code> to be set.
+ The following configurations are also relevant:
<code>spark.dynamicAllocation.minExecutors</code>,
<code>spark.dynamicAllocation.maxExecutors</code>, and
- <code>spark.shuffle.service.enabled</code>
+ <code>spark.dynamicAllocation.initialExecutors</code>
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.minExecutors</code></td>
- <td>(none)</td>
+ <td>0</td>
<td>
- Lower bound for the number of executors if dynamic allocation is enabled (required).
+ Lower bound for the number of executors if dynamic allocation is enabled.
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.maxExecutors</code></td>
- <td>(none)</td>
+ <td>Integer.MAX_VALUE</td>
+ <td>
+ Upper bound for the number of executors if dynamic allocation is enabled.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.dynamicAllocation.maxExecutors</code></td>
+ <td><code>spark.dynamicAllocation.minExecutors</code></td>
<td>
- Upper bound for the number of executors if dynamic allocation is enabled (required).
+ Initial number of executors to run if dynamic allocation is enabled.
</td>
</tr>
<tr>
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index a5425eb355..5295e351dd 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -77,11 +77,10 @@ scheduling while sharing cluster resources efficiently.
### Configuration and Setup
All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace.
-To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and
-provide lower and upper bounds for the number of executors through
-`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant
-configurations are described on the [configurations page](configuration.html#dynamic-allocation)
-and in the subsequent sections in detail.
+To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true`.
+Other relevant configurations are described on the
+[configurations page](configuration.html#dynamic-allocation) and in the subsequent sections in
+detail.
Additionally, your application must use an external shuffle service. The purpose of the service is
to preserve the shuffle files written by executors so the executors can be safely removed (more
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index f96b245512..5eb2023802 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -75,14 +75,23 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orNull
- // If dynamic allocation is enabled, start at the max number of executors
+ // If dynamic allocation is enabled, start at the configured initial number of executors.
+ // Default to minExecutors if no initialExecutors is set.
if (isDynamicAllocationEnabled) {
+ val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
+ val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
- if (!sparkConf.contains(maxExecutorsConf)) {
+ val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
+ val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
+ val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)
+
+ // If defined, initial executors must be between min and max
+ if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
throw new IllegalArgumentException(
- s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+ s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
}
- numExecutors = sparkConf.get(maxExecutorsConf).toInt
+
+ numExecutors = initialNumExecutors
}
}