aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-07-13 13:24:47 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-07-13 13:24:47 -0500
commitd8220c1e5e94abbdb9643672b918f0d748206db9 (patch)
tree4322602ced176a94531da134a32868ebd4971c83
parentf376c37268848dbb4b2fb57677e22ef2bf207b49 (diff)
downloadspark-d8220c1e5e94abbdb9643672b918f0d748206db9.tar.gz
spark-d8220c1e5e94abbdb9643672b918f0d748206db9.tar.bz2
spark-d8220c1e5e94abbdb9643672b918f0d748206db9.zip
[SPARK-16435][YARN][MINOR] Add warning log if initialExecutors is less than minExecutors
## What changes were proposed in this pull request? Currently if `spark.dynamicAllocation.initialExecutors` is less than `spark.dynamicAllocation.minExecutors`, Spark will automatically pick the minExecutors without any warning. While in 1.6 Spark will throw exception if configured like this. So here propose to add warning log if these parameters are configured invalidly. ## How was this patch tested? Unit test added to verify the scenario. Author: jerryshao <sshao@hortonworks.com> Closes #14149 from jerryshao/SPARK-16435.
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala3
2 files changed, 21 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 298e6243aa..2e4ec4c16c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2342,10 +2342,27 @@ private[spark] object Utils extends Logging {
* Return the initial number of executors for dynamic allocation.
*/
def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = {
- Seq(
+ if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
+ logWarning(s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key} less than " +
+ s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " +
+ "please update your configs.")
+ }
+
+ if (conf.get(EXECUTOR_INSTANCES).getOrElse(0) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
+ logWarning(s"${EXECUTOR_INSTANCES.key} less than " +
+ s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " +
+ "please update your configs.")
+ }
+
+ val initialExecutors = Seq(
conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
+
+ logInfo(s"Using initial executors = $initialExecutors, max of " +
+ s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, ${DYN_ALLOCATION_MIN_EXECUTORS.key} and " +
+ s"${EXECUTOR_INSTANCES.key}")
+ initialExecutors
}
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index f5d0fb00b7..30952a9458 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -782,6 +782,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4)
assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors
conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5)
+ assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors
+ conf.set("spark.dynamicAllocation.initialExecutors", "2")
+ .set("spark.executor.instances", "1")) === 3)
}