aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-06-23 14:03:46 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-06-23 14:03:46 -0500
commit738f134bf4bf07bafb17e7066cf1a36e315872c2 (patch)
tree6a1b71c2743206fe22c82ed44df56d80f6611741 /core
parenta410814c87b120cb5cfbf095b1bd94b1de862844 (diff)
downloadspark-738f134bf4bf07bafb17e7066cf1a36e315872c2.tar.gz
spark-738f134bf4bf07bafb17e7066cf1a36e315872c2.tar.bz2
spark-738f134bf4bf07bafb17e7066cf1a36e315872c2.zip
[SPARK-13723][YARN] Change behavior of --num-executors with dynamic allocation.
## What changes were proposed in this pull request? This changes the behavior of --num-executors and spark.executor.instances when using dynamic allocation. Instead of turning dynamic allocation off, it uses the value for the initial number of executors. This changes was discussed on [SPARK-13723](https://issues.apache.org/jira/browse/SPARK-13723). I highly recommend using it while we can change the behavior for 2.0.0. In practice, the 1.x behavior causes unexpected behavior for users (it is not clear that it disables dynamic allocation) and wastes cluster resources because users rarely notice the log message. ## How was this patch tested? This patch updates tests and adds a test for Utils.getDynamicAllocationInitialExecutors. Author: Ryan Blue <blue@apache.org> Closes #13338 from rdblue/SPARK-13723-num-executors-with-dynamic-allocation.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala18
4 files changed, 37 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 0926d05414..932ba16812 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -25,9 +25,10 @@ import scala.util.control.ControlThrowable
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
-import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
@@ -87,11 +88,9 @@ private[spark] class ExecutorAllocationManager(
import ExecutorAllocationManager._
// Lower and upper bounds on the number of executors.
- private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
- private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
- Integer.MAX_VALUE)
- private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors",
- minNumExecutors)
+ private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
+ private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+ private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 206c130c76..f1761e7c1e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -550,6 +550,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| (Default: 1).
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
+ | If dynamic allocation is enabled, the initial number of
+ | executors will be at least NUM.
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
| --principal PRINCIPAL Principal to be used to login to KDC, while running on
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 17d193b773..f77cc2f9b7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -52,6 +52,7 @@ import org.slf4j.Logger
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
@@ -2309,21 +2310,24 @@ private[spark] object Utils extends Logging {
}
/**
- * Return whether dynamic allocation is enabled in the given conf
- * Dynamic allocation and explicitly setting the number of executors are inherently
- * incompatible. In environments where dynamic allocation is turned on by default,
- * the latter should override the former (SPARK-9092).
+ * Return whether dynamic allocation is enabled in the given conf.
*/
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
- val numExecutor = conf.getInt("spark.executor.instances", 0)
val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
- if (numExecutor != 0 && dynamicAllocationEnabled) {
- logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
- }
- numExecutor == 0 && dynamicAllocationEnabled &&
+ dynamicAllocationEnabled &&
(!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false))
}
+ /**
+ * Return the initial number of executors for dynamic allocation.
+ */
+ def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = {
+ Seq(
+ conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
+ conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
+ conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
+ }
+
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
val resource = createResource
try f.apply(resource) finally resource.close()
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a5363f0bfd..e3a8e83f3e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -761,13 +761,29 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.dynamicAllocation.enabled", "true")) === true)
assert(Utils.isDynamicAllocationEnabled(
- conf.set("spark.executor.instances", "1")) === false)
+ conf.set("spark.executor.instances", "1")) === true)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.executor.instances", "0")) === true)
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false)
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true")))
}
+ test("getDynamicAllocationInitialExecutors") {
+ val conf = new SparkConf()
+ assert(Utils.getDynamicAllocationInitialExecutors(conf) === 0)
+ assert(Utils.getDynamicAllocationInitialExecutors(
+ conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors
+ conf.set("spark.executor.instances", "2")) === 3)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances
+ conf.set("spark.executor.instances", "4")) === 4)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances
+ conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors
+ conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5)
+ }
+
+
test("encodeFileNameToURIRawPath") {
assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz")