aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNiranjan Padmanabhan <niranjan.padmanabhan@cloudera.com>2015-08-12 16:10:21 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-08-12 16:10:21 -0700
commit738f353988dbf02704bd63f5e35d94402c59ed79 (patch)
tree1ef4619abe50dd2ebe51218afddeaa7051312cf9 /core
parenta17384fa343628cec44437da5b80b9403ecd5838 (diff)
downloadspark-738f353988dbf02704bd63f5e35d94402c59ed79.tar.gz
spark-738f353988dbf02704bd63f5e35d94402c59ed79.tar.bz2
spark-738f353988dbf02704bd63f5e35d94402c59ed79.zip
[SPARK-9092] Fixed incompatibility when both num-executors and dynamic...
… allocation are set. Now, dynamic allocation is set to false when num-executors is explicitly specified as an argument. Consequently, executorAllocationManager in not initialized in the SparkContext. Author: Niranjan Padmanabhan <niranjan.padmanabhan@cloudera.com> Closes #7657 from neurons/SPARK-9092.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala1
6 files changed, 45 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 8ff154fb5e..b344b5e173 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -389,6 +389,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
val driverOptsKey = "spark.driver.extraJavaOptions"
val driverClassPathKey = "spark.driver.extraClassPath"
val driverLibraryPathKey = "spark.driver.extraLibraryPath"
+ val sparkExecutorInstances = "spark.executor.instances"
// Used by Yarn in 1.1 and before
sys.props.get("spark.driver.libraryPath").foreach { value =>
@@ -476,6 +477,24 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}
}
+
+ if (!contains(sparkExecutorInstances)) {
+ sys.env.get("SPARK_WORKER_INSTANCES").foreach { value =>
+ val warning =
+ s"""
+ |SPARK_WORKER_INSTANCES was detected (set to '$value').
+ |This is deprecated in Spark 1.0+.
+ |
+ |Please instead use:
+ | - ./spark-submit with --num-executors to specify the number of executors
+ | - Or set SPARK_EXECUTOR_INSTANCES
+ | - spark.executor.instances to configure the number of instances in the spark config.
+ """.stripMargin
+ logWarning(warning)
+
+ set("spark.executor.instances", value)
+ }
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6aafb4c564..207a0c1bff 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -528,7 +528,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
- val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
+ val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
+ if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+ logInfo("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
+ }
+
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 7ac6cbce4c..02fa3088ed 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -422,7 +422,8 @@ object SparkSubmit {
// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
- OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
+ OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
+ sysProp = "spark.executor.instances"),
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
@@ -433,7 +434,6 @@ object SparkSubmit {
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
- OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c4012d0e83..a90d854136 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2286,6 +2286,17 @@ private[spark] object Utils extends Logging {
isInDirectory(parent, child.getParentFile)
}
+ /**
+ * Return whether dynamic allocation is enabled in the given conf
+ * Dynamic allocation and explicitly setting the number of executors are inherently
+ * incompatible. In environments where dynamic allocation is turned on by default,
+ * the latter should override the former (SPARK-9092).
+ */
+ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
+ conf.contains("spark.dynamicAllocation.enabled") &&
+ conf.getInt("spark.executor.instances", 0) == 0
+ }
+
}
private [util] class SparkShutdownHookManager {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 5c57940fa5..d4f2ea8765 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -285,4 +285,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("No exception when both num-executors and dynamic allocation set.") {
+ noException should be thrownBy {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
+ .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6"))
+ assert(sc.executorAllocationManager.isEmpty)
+ assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 757e0ce3d2..2456c5d0d4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -159,7 +159,6 @@ class SparkSubmitSuite
childArgsStr should include ("--executor-cores 5")
childArgsStr should include ("--arg arg1 --arg arg2")
childArgsStr should include ("--queue thequeue")
- childArgsStr should include ("--num-executors 6")
childArgsStr should include regex ("--jar .*thejar.jar")
childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar")
childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")