aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-02-02 12:27:08 -0800
committerAndrew Or <andrew@databricks.com>2015-02-02 12:27:08 -0800
commitb2047b55c5fc85de6b63276d8ab9610d2496e08b (patch)
treeb62017de84f2893af77a00714988e72b75c257a8 /core
parentc081b21b1fe4fbad845088c4144da0bd2a8d89dc (diff)
downloadspark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.tar.gz
spark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.tar.bz2
spark-b2047b55c5fc85de6b63276d8ab9610d2496e08b.zip
SPARK-4585. Spark dynamic executor allocation should use minExecutors as...
... initial number Author: Sandy Ryza <sandy@cloudera.com> Closes #4051 from sryza/sandy-spark-4585 and squashes the following commits: d1dd039 [Sandy Ryza] Add spark.dynamicAllocation.initialNumExecutors and make min and max not required b7c59dc [Sandy Ryza] SPARK-4585. Spark dynamic executor allocation should use minExecutors as initial number
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala15
2 files changed, 15 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b28da192c1..5d5288bb6e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -49,6 +49,7 @@ import org.apache.spark.scheduler._
* spark.dynamicAllocation.enabled - Whether this feature is enabled
* spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
+ * spark.dynamicAllocation.initialExecutors - Number of executors to start with
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
* If there are backlogged tasks for this duration, add new executors
@@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager(
import ExecutorAllocationManager._
- // Lower and upper bounds on the number of executors. These are required.
- private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
- private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
+ // Lower and upper bounds on the number of executors.
+ private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
+ private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
+ Integer.MAX_VALUE)
// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
@@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager(
*/
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
- throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
+ throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!")
}
- if (minNumExecutors == 0 || maxNumExecutors == 0) {
- throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
+ if (maxNumExecutors == 0) {
+ throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!")
}
if (minNumExecutors > maxNumExecutors) {
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 0e4df17c1b..57081ddd95 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -32,24 +32,23 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
import ExecutorAllocationManagerSuite._
test("verify min/max executors") {
- // No min or max
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
- intercept[SparkException] { new SparkContext(conf) }
- SparkEnv.get.stop() // cleanup the created environment
- SparkContext.clearActiveContext()
+ val sc0 = new SparkContext(conf)
+ assert(sc0.executorAllocationManager.isDefined)
+ sc0.stop()
- // Only min
- val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
+ // Min < 0
+ val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
intercept[SparkException] { new SparkContext(conf1) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
- // Only max
- val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
+ // Max < 0
+ val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
intercept[SparkException] { new SparkContext(conf2) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()