diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 19 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 3 |
2 files changed, 21 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 298e6243aa..2e4ec4c16c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2342,10 +2342,27 @@ private[spark] object Utils extends Logging { * Return the initial number of executors for dynamic allocation. */ def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = { - Seq( + if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) { + logWarning(s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key} less than " + + s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " + + "please update your configs.") + } + + if (conf.get(EXECUTOR_INSTANCES).getOrElse(0) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) { + logWarning(s"${EXECUTOR_INSTANCES.key} less than " + + s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " + + "please update your configs.") + } + + val initialExecutors = Seq( conf.get(DYN_ALLOCATION_MIN_EXECUTORS), conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS), conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max + + logInfo(s"Using initial executors = $initialExecutors, max of " + + s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, ${DYN_ALLOCATION_MIN_EXECUTORS.key} and " + + s"${EXECUTOR_INSTANCES.key}") + initialExecutors } def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index f5d0fb00b7..30952a9458 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -782,6 +782,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4) assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5) + assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors + conf.set("spark.dynamicAllocation.initialExecutors", "2") + .set("spark.executor.instances", "1")) === 3) } |