aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala18
-rw-r--r--docs/configuration.md3
-rw-r--r--docs/running-on-yarn.md2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala2
7 files changed, 42 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 0926d05414..932ba16812 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -25,9 +25,10 @@ import scala.util.control.ControlThrowable
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
-import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
@@ -87,11 +88,9 @@ private[spark] class ExecutorAllocationManager(
import ExecutorAllocationManager._
// Lower and upper bounds on the number of executors.
- private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
- private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
- Integer.MAX_VALUE)
- private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors",
- minNumExecutors)
+ private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
+ private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+ private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 206c130c76..f1761e7c1e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -550,6 +550,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| (Default: 1).
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
+ | If dynamic allocation is enabled, the initial number of
+ | executors will be at least NUM.
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
| --principal PRINCIPAL Principal to be used to login to KDC, while running on
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 17d193b773..f77cc2f9b7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -52,6 +52,7 @@ import org.slf4j.Logger
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
@@ -2309,21 +2310,24 @@ private[spark] object Utils extends Logging {
}
/**
- * Return whether dynamic allocation is enabled in the given conf
- * Dynamic allocation and explicitly setting the number of executors are inherently
- * incompatible. In environments where dynamic allocation is turned on by default,
- * the latter should override the former (SPARK-9092).
+ * Return whether dynamic allocation is enabled in the given conf.
*/
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
- val numExecutor = conf.getInt("spark.executor.instances", 0)
val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
- if (numExecutor != 0 && dynamicAllocationEnabled) {
- logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
- }
- numExecutor == 0 && dynamicAllocationEnabled &&
+ dynamicAllocationEnabled &&
(!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false))
}
+ /**
+ * Return the initial number of executors for dynamic allocation.
+ */
+ def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = {
+ Seq(
+ conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
+ conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
+ conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
+ }
+
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
val resource = createResource
try f.apply(resource) finally resource.close()
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a5363f0bfd..e3a8e83f3e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -761,13 +761,29 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.dynamicAllocation.enabled", "true")) === true)
assert(Utils.isDynamicAllocationEnabled(
- conf.set("spark.executor.instances", "1")) === false)
+ conf.set("spark.executor.instances", "1")) === true)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.executor.instances", "0")) === true)
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false)
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true")))
}
+ test("getDynamicAllocationInitialExecutors") {
+ val conf = new SparkConf()
+ assert(Utils.getDynamicAllocationInitialExecutors(conf) === 0)
+ assert(Utils.getDynamicAllocationInitialExecutors(
+ conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors
+ conf.set("spark.executor.instances", "2")) === 3)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances
+ conf.set("spark.executor.instances", "4")) === 4)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances
+ conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors
+ conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5)
+ }
+
+
test("encodeFileNameToURIRawPath") {
assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz")
diff --git a/docs/configuration.md b/docs/configuration.md
index fbda91c109..cee59cf2aa 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1236,6 +1236,9 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.dynamicAllocation.minExecutors</code></td>
<td>
Initial number of executors to run if dynamic allocation is enabled.
+ <br /><br />
+ If `--num-executors` (or `spark.executor.instances`) is set and larger than this value, it will
+ be used as the initial number of executors.
</td>
</tr>
<tr>
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 9833806716..dbd46cc48c 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -244,7 +244,7 @@ To use a custom metrics.properties for the application master and executors, upd
<td><code>spark.executor.instances</code></td>
<td><code>2</code></td>
<td>
- The number of executors. Note that this property is incompatible with <code>spark.dynamicAllocation.enabled</code>. If both <code>spark.dynamicAllocation.enabled</code> and <code>spark.executor.instances</code> are specified, dynamic allocation is turned off and the specified number of <code>spark.executor.instances</code> is used.
+ The number of executors for static allocation. With <code>spark.dynamicAllocation.enabled</code>, the initial set of executors will be at least this large.
</td>
</tr>
<tr>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index de6cd94613..156a7a30ea 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -520,7 +520,7 @@ object YarnSparkHadoopUtil {
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
- val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS)
+ val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
s"initial executor number $initialNumExecutors must between min executor number " +