aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-02-02 12:27:08 -0800
committerAndrew Or <andrew@databricks.com>2015-02-02 12:27:08 -0800
commitb2047b55c5fc85de6b63276d8ab9610d2496e08b (patch)
treeb62017de84f2893af77a00714988e72b75c257a8
parentc081b21b1fe4fbad845088c4144da0bd2a8d89dc (diff)
downloadspark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.tar.gz
spark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.tar.bz2
spark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.zip
SPARK-4585. Spark dynamic executor allocation should use minExecutors as...
... initial number Author: Sandy Ryza <sandy@cloudera.com> Closes #4051 from sryza/sandy-spark-4585 and squashes the following commits: d1dd039 [Sandy Ryza] Add spark.dynamicAllocation.initialNumExecutors and make min and max not required b7c59dc [Sandy Ryza] SPARK-4585. Spark dynamic executor allocation should use minExecutors as initial number
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala15
-rw-r--r--docs/configuration.md20
-rw-r--r--docs/job-scheduling.md9
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala17
5 files changed, 46 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b28da192c1..5d5288bb6e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -49,6 +49,7 @@ import org.apache.spark.scheduler._
* spark.dynamicAllocation.enabled - Whether this feature is enabled
* spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
+ * spark.dynamicAllocation.initialExecutors - Number of executors to start with
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
* If there are backlogged tasks for this duration, add new executors
@@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager(
import ExecutorAllocationManager._
- // Lower and upper bounds on the number of executors. These are required.
- private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
- private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
+ // Lower and upper bounds on the number of executors.
+ private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
+ private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
+ Integer.MAX_VALUE)
// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
@@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager(
*/
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
- throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
+ throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!")
}
- if (minNumExecutors == 0 || maxNumExecutors == 0) {
- throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
+ if (maxNumExecutors == 0) {
+ throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!")
}
if (minNumExecutors > maxNumExecutors) {
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 0e4df17c1b..57081ddd95 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -32,24 +32,23 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
import ExecutorAllocationManagerSuite._
test("verify min/max executors") {
- // No min or max
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
- intercept[SparkException] { new SparkContext(conf) }
- SparkEnv.get.stop() // cleanup the created environment
- SparkContext.clearActiveContext()
+ val sc0 = new SparkContext(conf)
+ assert(sc0.executorAllocationManager.isDefined)
+ sc0.stop()
- // Only min
- val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
+ // Min < 0
+ val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
intercept[SparkException] { new SparkContext(conf1) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
- // Only max
- val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
+ // Max < 0
+ val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
intercept[SparkException] { new SparkContext(conf2) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
diff --git a/docs/configuration.md b/docs/configuration.md
index e4e4b8d516..08c6befaf3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1098,24 +1098,32 @@ Apart from these, the following properties are also available, and may be useful
available on YARN mode. For more detail, see the description
<a href="job-scheduling.html#dynamic-resource-allocation">here</a>.
<br><br>
- This requires the following configurations to be set:
+ This requires <code>spark.shuffle.service.enabled</code> to be set.
+ The following configurations are also relevant:
<code>spark.dynamicAllocation.minExecutors</code>,
<code>spark.dynamicAllocation.maxExecutors</code>, and
- <code>spark.shuffle.service.enabled</code>
+ <code>spark.dynamicAllocation.initialExecutors</code>
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.minExecutors</code></td>
- <td>(none)</td>
+ <td>0</td>
<td>
- Lower bound for the number of executors if dynamic allocation is enabled (required).
+ Lower bound for the number of executors if dynamic allocation is enabled.
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.maxExecutors</code></td>
- <td>(none)</td>
+ <td>Integer.MAX_VALUE</td>
+ <td>
+ Upper bound for the number of executors if dynamic allocation is enabled.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.dynamicAllocation.maxExecutors</code></td>
+ <td><code>spark.dynamicAllocation.minExecutors</code></td>
<td>
- Upper bound for the number of executors if dynamic allocation is enabled (required).
+ Initial number of executors to run if dynamic allocation is enabled.
</td>
</tr>
<tr>
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index a5425eb355..5295e351dd 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -77,11 +77,10 @@ scheduling while sharing cluster resources efficiently.
### Configuration and Setup
All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace.
-To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and
-provide lower and upper bounds for the number of executors through
-`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant
-configurations are described on the [configurations page](configuration.html#dynamic-allocation)
-and in the subsequent sections in detail.
+To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true`.
+Other relevant configurations are described on the
+[configurations page](configuration.html#dynamic-allocation) and in the subsequent sections in
+detail.
Additionally, your application must use an external shuffle service. The purpose of the service is
to preserve the shuffle files written by executors so the executors can be safely removed (more
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index f96b245512..5eb2023802 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -75,14 +75,23 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orNull
- // If dynamic allocation is enabled, start at the max number of executors
+ // If dynamic allocation is enabled, start at the configured initial number of executors.
+ // Default to minExecutors if no initialExecutors is set.
if (isDynamicAllocationEnabled) {
+ val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
+ val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
- if (!sparkConf.contains(maxExecutorsConf)) {
+ val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
+ val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
+ val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)
+
+ // If defined, initial executors must be between min and max
+ if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
throw new IllegalArgumentException(
- s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+ s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
}
- numExecutors = sparkConf.get(maxExecutorsConf).toInt
+
+ numExecutors = initialNumExecutors
}
}