aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-10-30 15:31:23 -0700
committerAndrew Or <andrew@databricks.com>2014-10-30 15:31:23 -0700
commit26f092d4e32cc1f7e279646075eaf1e495395923 (patch)
tree302e0b43e8ef09135a13159ba0cac4dedae94328 /yarn
parent24c5129257ce6e3b734f168e860b714c2730b55f (diff)
downloadspark-26f092d4e32cc1f7e279646075eaf1e495395923.tar.gz
spark-26f092d4e32cc1f7e279646075eaf1e495395923.tar.bz2
spark-26f092d4e32cc1f7e279646075eaf1e495395923.zip
[SPARK-4138][SPARK-4139] Improve dynamic allocation settings
This should be merged after #2746 (SPARK-3795). **SPARK-4138**. If the user sets both the number of executors and `spark.dynamicAllocation.enabled`, we should throw an exception. **SPARK-4139**. If the user sets `spark.dynamicAllocation.enabled`, we should use the max number of executors as the starting number of executors because the first job is likely to run immediately after application startup. If the latter is not set, throw an exception. Author: Andrew Or <andrew@databricks.com> Closes #3002 from andrewor14/yarn-set-executors and squashes the following commits: c528fce [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-set-executors 55d4699 [Andrew Or] Bug fix: `isDynamicAllocationEnabled` was always false 2b0ccec [Andrew Or] Start the number of executors at the max 022bfde [Andrew Or] Guard against incompatible settings of number of executors
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala30
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala4
4 files changed, 29 insertions, 10 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 5c54e34003..104db4f65f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn
import org.apache.spark.util.{MemoryParam, IntParam}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import collection.mutable.ArrayBuffer
class ApplicationMasterArguments(val args: Array[String]) {
@@ -26,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorCores = 1
- var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ var numExecutors = DEFAULT_NUMBER_EXECUTORS
parseArgs(args.toList)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index a12f82d2fb..4d859450ef 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
-import org.apache.spark.util.{Utils, IntParam, MemoryParam}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
@@ -33,23 +33,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024 // MB
var executorCores = 1
- var numExecutors = 2
+ var numExecutors = DEFAULT_NUMBER_EXECUTORS
var amQueue = sparkConf.get("spark.yarn.queue", "default")
var amMemory: Int = 512 // MB
var appName: String = "Spark"
var priority = 0
- parseArgs(args.toList)
- loadEnvironmentArgs()
-
// Additional memory to allocate to containers
// For now, use driver's memory overhead as our AM container's memory overhead
- val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
+ val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
- val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+ private val isDynamicAllocationEnabled =
+ sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
+
+ parseArgs(args.toList)
+ loadEnvironmentArgs()
validateArgs()
/** Load any default arguments provided through environment variables and Spark properties. */
@@ -64,6 +66,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orNull
+ // If dynamic allocation is enabled, start at the max number of executors
+ if (isDynamicAllocationEnabled) {
+ val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
+ if (!sparkConf.contains(maxExecutorsConf)) {
+ throw new IllegalArgumentException(
+ s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+ }
+ numExecutors = sparkConf.get(maxExecutorsConf).toInt
+ }
}
/**
@@ -113,6 +124,11 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
}
+ // Dynamic allocation is not compatible with this option
+ if (isDynamicAllocationEnabled) {
+ throw new IllegalArgumentException("Explicitly setting the number " +
+ "of executors is not compatible with spark.dynamicAllocation.enabled!")
+ }
numExecutors = value
args = tail
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index e1e0144f46..7d453ecb79 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -93,6 +93,8 @@ object YarnSparkHadoopUtil {
val ANY_HOST = "*"
+ val DEFAULT_NUMBER_EXECUTORS = 2
+
// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
val RM_REQUEST_PRIORITY = 1
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index a96a54f668..b1de81e6a8 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -18,7 +18,7 @@
package org.apache.spark.scheduler.cluster
import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.IntParam
@@ -29,7 +29,7 @@ private[spark] class YarnClusterSchedulerBackend(
override def start() {
super.start()
- totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
.getOrElse(totalExpectedExecutors)