From 26f092d4e32cc1f7e279646075eaf1e495395923 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 30 Oct 2014 15:31:23 -0700 Subject: [SPARK-4138][SPARK-4139] Improve dynamic allocation settings This should be merged after #2746 (SPARK-3795). **SPARK-4138**. If the user sets both the number of executors and `spark.dynamicAllocation.enabled`, we should throw an exception. **SPARK-4139**. If the user sets `spark.dynamicAllocation.enabled`, we should use the max number of executors as the starting number of executors because the first job is likely to run immediately after application startup. If the latter is not set, throw an exception. Author: Andrew Or Closes #3002 from andrewor14/yarn-set-executors and squashes the following commits: c528fce [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-set-executors 55d4699 [Andrew Or] Bug fix: `isDynamicAllocationEnabled` was always false 2b0ccec [Andrew Or] Start the number of executors at the max 022bfde [Andrew Or] Guard against incompatible settings of number of executors --- .../deploy/yarn/ApplicationMasterArguments.scala | 3 ++- .../apache/spark/deploy/yarn/ClientArguments.scala | 30 +++++++++++++++++----- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 ++ .../cluster/YarnClusterSchedulerBackend.scala | 4 +-- 4 files changed, 29 insertions(+), 10 deletions(-) (limited to 'yarn') diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 5c54e34003..104db4f65f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.yarn import org.apache.spark.util.{MemoryParam, IntParam} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import collection.mutable.ArrayBuffer class ApplicationMasterArguments(val args: Array[String]) { @@ -26,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) { var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 var executorCores = 1 - var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS + var numExecutors = DEFAULT_NUMBER_EXECUTORS parseArgs(args.toList) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index a12f82d2fb..4d859450ef 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf -import org.apache.spark.util.{Utils, IntParam, MemoryParam} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.util.{Utils, IntParam, MemoryParam} // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) { @@ -33,23 +33,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 // MB var executorCores = 1 - var numExecutors = 2 + var numExecutors = DEFAULT_NUMBER_EXECUTORS var amQueue = sparkConf.get("spark.yarn.queue", "default") var amMemory: Int = 512 // MB var appName: String = "Spark" var priority = 0 - parseArgs(args.toList) - loadEnvironmentArgs() - // Additional memory to allocate to containers // For now, use driver's memory overhead as our AM container's memory overhead - val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) - val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) + private val isDynamicAllocationEnabled = + sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) + + parseArgs(args.toList) + loadEnvironmentArgs() validateArgs() /** Load any default arguments provided through environment variables and Spark properties. */ @@ -64,6 +66,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p))) .orNull + // If dynamic allocation is enabled, start at the max number of executors + if (isDynamicAllocationEnabled) { + val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors" + if (!sparkConf.contains(maxExecutorsConf)) { + throw new IllegalArgumentException( + s"$maxExecutorsConf must be set if dynamic allocation is enabled!") + } + numExecutors = sparkConf.get(maxExecutorsConf).toInt + } } /** @@ -113,6 +124,11 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") } + // Dynamic allocation is not compatible with this option + if (isDynamicAllocationEnabled) { + throw new IllegalArgumentException("Explicitly setting the number " + + "of executors is not compatible with spark.dynamicAllocation.enabled!") + } numExecutors = value args = tail diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index e1e0144f46..7d453ecb79 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -93,6 +93,8 @@ object YarnSparkHadoopUtil { val ANY_HOST = "*" + val DEFAULT_NUMBER_EXECUTORS = 2 + // All RM requests are issued with same priority : we do not (yet) have any distinction between // request types (like map/reduce in hadoop for example) val RM_REQUEST_PRIORITY = 1 diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index a96a54f668..b1de81e6a8 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.SparkContext -import org.apache.spark.deploy.yarn.ApplicationMasterArguments +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.IntParam @@ -29,7 +29,7 @@ private[spark] class YarnClusterSchedulerBackend( override def start() { super.start() - totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS + totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) .getOrElse(totalExpectedExecutors) -- cgit v1.2.3