aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-09-28 06:38:54 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-09-28 06:38:54 -0700
commit353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b (patch)
treed2c1a16e24b60effbbeaf4ae219b8b72c1621eb6 /yarn
parentd8d50ed388d2e695b69d2b93a620045ef2f0bc18 (diff)
downloadspark-353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b.tar.gz
spark-353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b.tar.bz2
spark-353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b.zip
[SPARK-10790] [YARN] Fix initial executor number not set issue and consolidate the codes
This bug is introduced in [SPARK-9092](https://issues.apache.org/jira/browse/SPARK-9092), `targetExecutorNumber` should use `minExecutors` if `initialExecutors` is not set. Using 0 instead will meet the problem as mentioned in [SPARK-10790](https://issues.apache.org/jira/browse/SPARK-10790). Also consolidate and simplify some similar code snippets to keep the consistent semantics. Author: jerryshao <sshao@hortonworks.com> Closes #8910 from jerryshao/SPARK-10790.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala20
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala23
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala18
4 files changed, 27 insertions, 40 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 54f62e6b72..1165061db2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -81,25 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orNull
// If dynamic allocation is enabled, start at the configured initial number of executors.
// Default to minExecutors if no initialExecutors is set.
- if (isDynamicAllocationEnabled) {
- val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
- val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
- val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
- val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
- val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
- val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)
-
- // If defined, initial executors must be between min and max
- if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
- throw new IllegalArgumentException(
- s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
- }
-
- numExecutors = initialNumExecutors
- } else {
- val numExecutorsConf = "spark.executor.instances"
- numExecutors = sparkConf.getInt(numExecutorsConf, numExecutors)
- }
+ numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
principal = Option(principal)
.orElse(sparkConf.getOption("spark.yarn.principal"))
.orNull
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index fd88b8b7fe..9e1ef1b3b4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -89,11 +89,7 @@ private[yarn] class YarnAllocator(
@volatile private var numExecutorsFailed = 0
@volatile private var targetNumExecutors =
- if (Utils.isDynamicAllocationEnabled(sparkConf)) {
- sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
- } else {
- sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
- }
+ YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 445d3dcd26..f276e7efde 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -314,5 +314,28 @@ object YarnSparkHadoopUtil {
def getClassPathSeparator(): String = {
classPathSeparatorField.get(null).asInstanceOf[String]
}
+
+ /**
+ * Getting the initial target number of executors depends on whether dynamic allocation is
+ * enabled.
+ */
+ def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
+ if (Utils.isDynamicAllocationEnabled(conf)) {
+ val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
+ val initialNumExecutors =
+ conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
+ val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
+ require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
+ s"initial executor number $initialNumExecutors must between min executor number" +
+ s"$minNumExecutors and max executor number $maxNumExecutors")
+
+ initialNumExecutors
+ } else {
+ val targetNumExecutors =
+ sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
+ // System property can override environment variable.
+ conf.getInt("spark.executor.instances", targetNumExecutors)
+ }
+ }
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 1aed5a1675..50b699f11b 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -17,21 +17,13 @@
package org.apache.spark.scheduler.cluster
-import java.net.NetworkInterface
-
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.yarn.api.records.NodeState
-import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.{IntParam, Utils}
+import org.apache.spark.util.Utils
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
@@ -40,13 +32,7 @@ private[spark] class YarnClusterSchedulerBackend(
override def start() {
super.start()
- totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
- if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
- totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
- .getOrElse(totalExpectedExecutors)
- }
- // System property can override environment variable.
- totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
+ totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
}
override def applicationId(): String =