aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala3
2 files changed, 21 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 298e6243aa..2e4ec4c16c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2342,10 +2342,27 @@ private[spark] object Utils extends Logging {
* Return the initial number of executors for dynamic allocation.
*/
def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = {
- Seq(
+ if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
+ logWarning(s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key} less than " +
+ s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " +
+ "please update your configs.")
+ }
+
+ if (conf.get(EXECUTOR_INSTANCES).getOrElse(0) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
+ logWarning(s"${EXECUTOR_INSTANCES.key} less than " +
+ s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " +
+ "please update your configs.")
+ }
+
+ val initialExecutors = Seq(
conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
+
+ logInfo(s"Using initial executors = $initialExecutors, max of " +
+ s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, ${DYN_ALLOCATION_MIN_EXECUTORS.key} and " +
+ s"${EXECUTOR_INSTANCES.key}")
+ initialExecutors
}
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index f5d0fb00b7..30952a9458 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -782,6 +782,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4)
assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors
conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors
+ conf.set("spark.dynamicAllocation.initialExecutors", "2")
+ .set("spark.executor.instances", "1")) === 3)
}