diff options
author | jerryshao <sshao@hortonworks.com> | 2015-09-28 06:38:54 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2015-09-28 06:38:54 -0700 |
commit | 353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b (patch) | |
tree | d2c1a16e24b60effbbeaf4ae219b8b72c1621eb6 /yarn | |
parent | d8d50ed388d2e695b69d2b93a620045ef2f0bc18 (diff) | |
download | spark-353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b.tar.gz spark-353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b.tar.bz2 spark-353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b.zip |
[SPARK-10790] [YARN] Fix initial executor number not set issue and consolidate the codes
This bug is introduced in [SPARK-9092](https://issues.apache.org/jira/browse/SPARK-9092), `targetExecutorNumber` should use `minExecutors` if `initialExecutors` is not set. Using 0 instead will meet the problem as mentioned in [SPARK-10790](https://issues.apache.org/jira/browse/SPARK-10790).
Also consolidate and simplify some similar code snippets to keep the consistent semantics.
Author: jerryshao <sshao@hortonworks.com>
Closes #8910 from jerryshao/SPARK-10790.
Diffstat (limited to 'yarn')
4 files changed, 27 insertions, 40 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 54f62e6b72..1165061db2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -81,25 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orNull // If dynamic allocation is enabled, start at the configured initial number of executors. // Default to minExecutors if no initialExecutors is set. - if (isDynamicAllocationEnabled) { - val minExecutorsConf = "spark.dynamicAllocation.minExecutors" - val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors" - val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors" - val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0) - val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors) - val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE) - - // If defined, initial executors must be between min and max - if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) { - throw new IllegalArgumentException( - s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!") - } - - numExecutors = initialNumExecutors - } else { - val numExecutorsConf = "spark.executor.instances" - numExecutors = sparkConf.getInt(numExecutorsConf, numExecutors) - } + numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) principal = Option(principal) .orElse(sparkConf.getOption("spark.yarn.principal")) .orNull diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index fd88b8b7fe..9e1ef1b3b4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -89,11 +89,7 @@ private[yarn] class YarnAllocator( @volatile private var numExecutorsFailed = 0 @volatile private var targetNumExecutors = - if (Utils.isDynamicAllocationEnabled(sparkConf)) { - sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0) - } else { - sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS) - } + YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) // Executor loss reason requests that are pending - maps from executor ID for inquiry to a // list of requesters that should be responded to once we find out why the given executor diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 445d3dcd26..f276e7efde 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -314,5 +314,28 @@ object YarnSparkHadoopUtil { def getClassPathSeparator(): String = { classPathSeparatorField.get(null).asInstanceOf[String] } + + /** + * Getting the initial target number of executors depends on whether dynamic allocation is + * enabled. + */ + def getInitialTargetExecutorNumber(conf: SparkConf): Int = { + if (Utils.isDynamicAllocationEnabled(conf)) { + val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) + val initialNumExecutors = + conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) + val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue) + require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, + s"initial executor number $initialNumExecutors must between min executor number" + + s"$minNumExecutors and max executor number $maxNumExecutors") + + initialNumExecutors + } else { + val targetNumExecutors = + sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS) + // System property can override environment variable. + conf.getInt("spark.executor.instances", targetNumExecutors) + } + } } diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 1aed5a1675..50b699f11b 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -17,21 +17,13 @@ package org.apache.spark.scheduler.cluster -import java.net.NetworkInterface - import org.apache.hadoop.yarn.api.ApplicationConstants.Environment - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.yarn.api.records.NodeState -import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.SparkContext import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.{IntParam, Utils} +import org.apache.spark.util.Utils private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -40,13 +32,7 @@ private[spark] class YarnClusterSchedulerBackend( override def start() { super.start() - totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS - if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { - totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) - .getOrElse(totalExpectedExecutors) - } - // System property can override environment variable. - totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors) + totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf) } override def applicationId(): String = |