aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiranjan Padmanabhan <niranjan.padmanabhan@cloudera.com>2015-08-12 16:10:21 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-08-12 16:10:21 -0700
commit738f353988dbf02704bd63f5e35d94402c59ed79 (patch)
tree1ef4619abe50dd2ebe51218afddeaa7051312cf9
parenta17384fa343628cec44437da5b80b9403ecd5838 (diff)
downloadspark-738f353988dbf02704bd63f5e35d94402c59ed79.tar.gz
spark-738f353988dbf02704bd63f5e35d94402c59ed79.tar.bz2
spark-738f353988dbf02704bd63f5e35d94402c59ed79.zip
[SPARK-9092] Fixed incompatibility when both num-executors and dynamic...
… allocation are set. Now, dynamic allocation is set to false when num-executors is explicitly specified as an argument. Consequently, executorAllocationManager in not initialized in the SparkContext. Author: Niranjan Padmanabhan <niranjan.padmanabhan@cloudera.com> Closes #7657 from neurons/SPARK-9092.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala1
-rw-r--r--docs/running-on-yarn.md2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala8
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala9
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala3
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala5
14 files changed, 64 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 8ff154fb5e..b344b5e173 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -389,6 +389,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
val driverOptsKey = "spark.driver.extraJavaOptions"
val driverClassPathKey = "spark.driver.extraClassPath"
val driverLibraryPathKey = "spark.driver.extraLibraryPath"
+ val sparkExecutorInstances = "spark.executor.instances"
// Used by Yarn in 1.1 and before
sys.props.get("spark.driver.libraryPath").foreach { value =>
@@ -476,6 +477,24 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}
}
+
+ if (!contains(sparkExecutorInstances)) {
+ sys.env.get("SPARK_WORKER_INSTANCES").foreach { value =>
+ val warning =
+ s"""
+ |SPARK_WORKER_INSTANCES was detected (set to '$value').
+ |This is deprecated in Spark 1.0+.
+ |
+ |Please instead use:
+ | - ./spark-submit with --num-executors to specify the number of executors
+ | - Or set SPARK_EXECUTOR_INSTANCES
+ | - spark.executor.instances to configure the number of instances in the spark config.
+ """.stripMargin
+ logWarning(warning)
+
+ set("spark.executor.instances", value)
+ }
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6aafb4c564..207a0c1bff 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -528,7 +528,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
- val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
+ val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
+ if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+ logInfo("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
+ }
+
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 7ac6cbce4c..02fa3088ed 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -422,7 +422,8 @@ object SparkSubmit {
// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
- OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
+ OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
+ sysProp = "spark.executor.instances"),
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
@@ -433,7 +434,6 @@ object SparkSubmit {
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
- OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c4012d0e83..a90d854136 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2286,6 +2286,17 @@ private[spark] object Utils extends Logging {
isInDirectory(parent, child.getParentFile)
}
+ /**
+ * Return whether dynamic allocation is enabled in the given conf
+ * Dynamic allocation and explicitly setting the number of executors are inherently
+ * incompatible. In environments where dynamic allocation is turned on by default,
+ * the latter should override the former (SPARK-9092).
+ */
+ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
+ conf.contains("spark.dynamicAllocation.enabled") &&
+ conf.getInt("spark.executor.instances", 0) == 0
+ }
+
}
private [util] class SparkShutdownHookManager {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 5c57940fa5..d4f2ea8765 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -285,4 +285,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("No exception when both num-executors and dynamic allocation set.") {
+ noException should be thrownBy {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
+ .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6"))
+ assert(sc.executorAllocationManager.isEmpty)
+ assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 757e0ce3d2..2456c5d0d4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -159,7 +159,6 @@ class SparkSubmitSuite
childArgsStr should include ("--executor-cores 5")
childArgsStr should include ("--arg arg1 --arg arg2")
childArgsStr should include ("--queue thequeue")
- childArgsStr should include ("--num-executors 6")
childArgsStr should include regex ("--jar .*thejar.jar")
childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar")
childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index cac08a91b9..ec32c419b7 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -199,7 +199,7 @@ If you need a reference to the proper location to put log files in the YARN so t
<td><code>spark.executor.instances</code></td>
<td>2</td>
<td>
- The number of executors. Note that this property is incompatible with <code>spark.dynamicAllocation.enabled</code>.
+ The number of executors. Note that this property is incompatible with <code>spark.dynamicAllocation.enabled</code>. If both <code>spark.dynamicAllocation.enabled</code> and <code>spark.executor.instances</code> are specified, dynamic allocation is turned off and the specified number of <code>spark.executor.instances</code> is used.
</td>
</tr>
<tr>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1d67b3ebb5..e19940d8d6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -64,7 +64,8 @@ private[spark] class ApplicationMaster(
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
+ sparkConf.getInt("spark.yarn.max.worker.failures",
+ math.max(sparkConf.getInt("spark.executor.instances", 0) * 2, 3)))
@volatile private var exitCode = 0
@volatile private var unregistered = false
@@ -493,7 +494,6 @@ private[spark] class ApplicationMaster(
*/
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
- System.setProperty("spark.executor.instances", args.numExecutors.toString)
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 37f7937633..b08412414a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -29,7 +29,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Nil
var executorMemory = 1024
var executorCores = 1
- var numExecutors = DEFAULT_NUMBER_EXECUTORS
var propertiesFile: String = null
parseArgs(args.toList)
@@ -63,10 +62,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
userArgsBuffer += value
args = tail
- case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
- numExecutors = value
- args = tail
-
case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
executorMemory = value
args = tail
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b4ba3f0221..6d63ddaf15 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -751,7 +751,6 @@ private[spark] class Client(
userArgs ++ Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
- "--num-executors ", args.numExecutors.toString,
"--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
@@ -960,6 +959,10 @@ object Client extends Logging {
val sparkConf = new SparkConf
val args = new ClientArguments(argStrings, sparkConf)
+ // to maintain backwards-compatibility
+ if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
+ sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString)
+ }
new Client(args, sparkConf).run()
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 20d63d40cf..4f42ffefa7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -53,8 +53,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
private val driverCoresKey = "spark.driver.cores"
private val amCoresKey = "spark.yarn.am.cores"
- private val isDynamicAllocationEnabled =
- sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
+ private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
parseArgs(args.toList)
loadEnvironmentArgs()
@@ -196,11 +195,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
}
- // Dynamic allocation is not compatible with this option
- if (isDynamicAllocationEnabled) {
- throw new IllegalArgumentException("Explicitly setting the number " +
- "of executors is not compatible with spark.dynamicAllocation.enabled!")
- }
numExecutors = value
args = tail
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 59caa787b6..ccf753e69f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,6 +21,8 @@ import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern
+import org.apache.spark.util.Utils
+
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -86,7 +88,12 @@ private[yarn] class YarnAllocator(
private var executorIdCounter = 0
@volatile private var numExecutorsFailed = 0
- @volatile private var targetNumExecutors = args.numExecutors
+ @volatile private var targetNumExecutors =
+ if (Utils.isDynamicAllocationEnabled(sparkConf)) {
+ sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
+ } else {
+ sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
+ }
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d225061fcd..d06d951404 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -81,8 +81,6 @@ private[spark] class YarnClientSchedulerBackend(
// List of (target Client argument, environment variable, Spark property)
val optionTuples =
List(
- ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
- ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
@@ -92,7 +90,6 @@ private[spark] class YarnClientSchedulerBackend(
)
// Warn against the following deprecated environment variables: env var -> suggestion
val deprecatedEnvVars = Map(
- "SPARK_WORKER_INSTANCES" -> "SPARK_WORKER_INSTANCES or --num-executors through spark-submit",
"SPARK_WORKER_MEMORY" -> "SPARK_EXECUTOR_MEMORY or --executor-memory through spark-submit",
"SPARK_WORKER_CORES" -> "SPARK_EXECUTOR_CORES or --executor-cores through spark-submit")
optionTuples.foreach { case (optionName, envVar, sparkProp) =>
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 58318bf9bc..5d05f514ad 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -87,16 +87,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
val args = Array(
- "--num-executors", s"$maxExecutors",
"--executor-cores", "5",
"--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
+ val sparkConfClone = sparkConf.clone()
+ sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
conf,
- sparkConf,
+ sparkConfClone,
rmClient,
appAttemptId,
new ApplicationMasterArguments(args),