aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala9
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala17
2 files changed, 17 insertions, 9 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index f8fb96b312..833e249f9f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -30,15 +30,15 @@ private[spark] class YarnClientSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
- if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
- ready = false
}
var client: Client = null
var appId: ApplicationId = null
var checkerThread: Thread = null
var stopping: Boolean = false
+ var totalExpectedExecutors = 0
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
arrayBuf: ArrayBuffer[String]) {
@@ -84,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend(
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
- totalExpectedExecutors.set(args.numExecutors)
+ totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
@@ -150,4 +150,7 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
+ }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 0ad1794d19..55665220a6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -27,19 +27,24 @@ private[spark] class YarnClusterSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
- if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+ var totalExpectedExecutors = 0
+
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
- ready = false
}
override def start() {
super.start()
- var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
- numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
+ totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
+ .getOrElse(totalExpectedExecutors)
}
// System property can override environment variable.
- numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
- totalExpectedExecutors.set(numExecutors)
+ totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
+ }
+
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
}