aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorNiranjan Padmanabhan <niranjan.padmanabhan@cloudera.com>2015-08-12 16:10:21 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-08-12 16:10:21 -0700
commit738f353988dbf02704bd63f5e35d94402c59ed79 (patch)
tree1ef4619abe50dd2ebe51218afddeaa7051312cf9 /yarn
parenta17384fa343628cec44437da5b80b9403ecd5838 (diff)
downloadspark-738f353988dbf02704bd63f5e35d94402c59ed79.tar.gz
spark-738f353988dbf02704bd63f5e35d94402c59ed79.tar.bz2
spark-738f353988dbf02704bd63f5e35d94402c59ed79.zip
[SPARK-9092] Fixed incompatibility when both num-executors and dynamic...
… allocation are set. Now, dynamic allocation is set to false when num-executors is explicitly specified as an argument. Consequently, executorAllocationManager in not initialized in the SparkContext. Author: Niranjan Padmanabhan <niranjan.padmanabhan@cloudera.com> Closes #7657 from neurons/SPARK-9092.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala8
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala9
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala3
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala5
7 files changed, 18 insertions, 21 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1d67b3ebb5..e19940d8d6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -64,7 +64,8 @@ private[spark] class ApplicationMaster(
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
+ sparkConf.getInt("spark.yarn.max.worker.failures",
+ math.max(sparkConf.getInt("spark.executor.instances", 0) * 2, 3)))
@volatile private var exitCode = 0
@volatile private var unregistered = false
@@ -493,7 +494,6 @@ private[spark] class ApplicationMaster(
*/
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
- System.setProperty("spark.executor.instances", args.numExecutors.toString)
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 37f7937633..b08412414a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -29,7 +29,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Nil
var executorMemory = 1024
var executorCores = 1
- var numExecutors = DEFAULT_NUMBER_EXECUTORS
var propertiesFile: String = null
parseArgs(args.toList)
@@ -63,10 +62,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
userArgsBuffer += value
args = tail
- case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
- numExecutors = value
- args = tail
-
case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
executorMemory = value
args = tail
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b4ba3f0221..6d63ddaf15 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -751,7 +751,6 @@ private[spark] class Client(
userArgs ++ Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
- "--num-executors ", args.numExecutors.toString,
"--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
@@ -960,6 +959,10 @@ object Client extends Logging {
val sparkConf = new SparkConf
val args = new ClientArguments(argStrings, sparkConf)
+ // to maintain backwards-compatibility
+ if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
+ sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString)
+ }
new Client(args, sparkConf).run()
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 20d63d40cf..4f42ffefa7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -53,8 +53,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
private val driverCoresKey = "spark.driver.cores"
private val amCoresKey = "spark.yarn.am.cores"
- private val isDynamicAllocationEnabled =
- sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
+ private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
parseArgs(args.toList)
loadEnvironmentArgs()
@@ -196,11 +195,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
}
- // Dynamic allocation is not compatible with this option
- if (isDynamicAllocationEnabled) {
- throw new IllegalArgumentException("Explicitly setting the number " +
- "of executors is not compatible with spark.dynamicAllocation.enabled!")
- }
numExecutors = value
args = tail
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 59caa787b6..ccf753e69f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,6 +21,8 @@ import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern
+import org.apache.spark.util.Utils
+
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -86,7 +88,12 @@ private[yarn] class YarnAllocator(
private var executorIdCounter = 0
@volatile private var numExecutorsFailed = 0
- @volatile private var targetNumExecutors = args.numExecutors
+ @volatile private var targetNumExecutors =
+ if (Utils.isDynamicAllocationEnabled(sparkConf)) {
+ sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
+ } else {
+ sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
+ }
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d225061fcd..d06d951404 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -81,8 +81,6 @@ private[spark] class YarnClientSchedulerBackend(
// List of (target Client argument, environment variable, Spark property)
val optionTuples =
List(
- ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
- ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
@@ -92,7 +90,6 @@ private[spark] class YarnClientSchedulerBackend(
)
// Warn against the following deprecated environment variables: env var -> suggestion
val deprecatedEnvVars = Map(
- "SPARK_WORKER_INSTANCES" -> "SPARK_WORKER_INSTANCES or --num-executors through spark-submit",
"SPARK_WORKER_MEMORY" -> "SPARK_EXECUTOR_MEMORY or --executor-memory through spark-submit",
"SPARK_WORKER_CORES" -> "SPARK_EXECUTOR_CORES or --executor-cores through spark-submit")
optionTuples.foreach { case (optionName, envVar, sparkProp) =>
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 58318bf9bc..5d05f514ad 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -87,16 +87,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
val args = Array(
- "--num-executors", s"$maxExecutors",
"--executor-cores", "5",
"--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
+ val sparkConfClone = sparkConf.clone()
+ sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
conf,
- sparkConf,
+ sparkConfClone,
rmClient,
appAttemptId,
new ApplicationMasterArguments(args),