aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala30
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala4
4 files changed, 29 insertions, 10 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 5c54e34003..104db4f65f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn
import org.apache.spark.util.{MemoryParam, IntParam}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import collection.mutable.ArrayBuffer
class ApplicationMasterArguments(val args: Array[String]) {
@@ -26,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorCores = 1
- var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ var numExecutors = DEFAULT_NUMBER_EXECUTORS
parseArgs(args.toList)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index a12f82d2fb..4d859450ef 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
-import org.apache.spark.util.{Utils, IntParam, MemoryParam}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
@@ -33,23 +33,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024 // MB
var executorCores = 1
- var numExecutors = 2
+ var numExecutors = DEFAULT_NUMBER_EXECUTORS
var amQueue = sparkConf.get("spark.yarn.queue", "default")
var amMemory: Int = 512 // MB
var appName: String = "Spark"
var priority = 0
- parseArgs(args.toList)
- loadEnvironmentArgs()
-
// Additional memory to allocate to containers
// For now, use driver's memory overhead as our AM container's memory overhead
- val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
+ val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
- val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+ private val isDynamicAllocationEnabled =
+ sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
+
+ parseArgs(args.toList)
+ loadEnvironmentArgs()
validateArgs()
/** Load any default arguments provided through environment variables and Spark properties. */
@@ -64,6 +66,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orNull
+ // If dynamic allocation is enabled, start at the max number of executors
+ if (isDynamicAllocationEnabled) {
+ val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
+ if (!sparkConf.contains(maxExecutorsConf)) {
+ throw new IllegalArgumentException(
+ s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+ }
+ numExecutors = sparkConf.get(maxExecutorsConf).toInt
+ }
}
/**
@@ -113,6 +124,11 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
}
+ // Dynamic allocation is not compatible with this option
+ if (isDynamicAllocationEnabled) {
+ throw new IllegalArgumentException("Explicitly setting the number " +
+ "of executors is not compatible with spark.dynamicAllocation.enabled!")
+ }
numExecutors = value
args = tail
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index e1e0144f46..7d453ecb79 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -93,6 +93,8 @@ object YarnSparkHadoopUtil {
val ANY_HOST = "*"
+ val DEFAULT_NUMBER_EXECUTORS = 2
+
// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
val RM_REQUEST_PRIORITY = 1
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index a96a54f668..b1de81e6a8 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -18,7 +18,7 @@
package org.apache.spark.scheduler.cluster
import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.IntParam
@@ -29,7 +29,7 @@ private[spark] class YarnClusterSchedulerBackend(
override def start() {
super.start()
- totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
.getOrElse(totalExpectedExecutors)