diff options
Diffstat (limited to 'yarn/common')
4 files changed, 29 insertions, 10 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 5c54e34003..104db4f65f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.yarn import org.apache.spark.util.{MemoryParam, IntParam} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import collection.mutable.ArrayBuffer class ApplicationMasterArguments(val args: Array[String]) { @@ -26,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) { var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 var executorCores = 1 - var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS + var numExecutors = DEFAULT_NUMBER_EXECUTORS parseArgs(args.toList) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index a12f82d2fb..4d859450ef 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf -import org.apache.spark.util.{Utils, IntParam, MemoryParam} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.util.{Utils, IntParam, MemoryParam} // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) { @@ -33,23 +33,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 // MB var executorCores = 1 - var numExecutors = 2 + var numExecutors = DEFAULT_NUMBER_EXECUTORS var amQueue = sparkConf.get("spark.yarn.queue", "default") var amMemory: Int = 512 // MB var appName: String = "Spark" var priority = 0 - parseArgs(args.toList) - loadEnvironmentArgs() - // Additional memory to allocate to containers // For now, use driver's memory overhead as our AM container's memory overhead - val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) - val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) + private val isDynamicAllocationEnabled = + sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) + + parseArgs(args.toList) + loadEnvironmentArgs() validateArgs() /** Load any default arguments provided through environment variables and Spark properties. */ @@ -64,6 +66,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p))) .orNull + // If dynamic allocation is enabled, start at the max number of executors + if (isDynamicAllocationEnabled) { + val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors" + if (!sparkConf.contains(maxExecutorsConf)) { + throw new IllegalArgumentException( + s"$maxExecutorsConf must be set if dynamic allocation is enabled!") + } + numExecutors = sparkConf.get(maxExecutorsConf).toInt + } } /** @@ -113,6 +124,11 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") } + // Dynamic allocation is not compatible with this option + if (isDynamicAllocationEnabled) { + throw new IllegalArgumentException("Explicitly setting the number " + + "of executors is not compatible with spark.dynamicAllocation.enabled!") + } numExecutors = value args = tail diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index e1e0144f46..7d453ecb79 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -93,6 +93,8 @@ object YarnSparkHadoopUtil { val ANY_HOST = "*" + val DEFAULT_NUMBER_EXECUTORS = 2 + // All RM requests are issued with same priority : we do not (yet) have any distinction between // request types (like map/reduce in hadoop for example) val RM_REQUEST_PRIORITY = 1 diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index a96a54f668..b1de81e6a8 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.SparkContext -import org.apache.spark.deploy.yarn.ApplicationMasterArguments +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.IntParam @@ -29,7 +29,7 @@ private[spark] class YarnClusterSchedulerBackend( override def start() { super.start() - totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS + totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) .getOrElse(totalExpectedExecutors) |