aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorli-zhihui <zhihui.li@intel.com>2014-08-08 22:52:56 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-08 22:52:56 -0700
commit28dbae85aaf6842e22cd7465cb11cb34d58fc56d (patch)
tree37603cc720180a2d90bbf708b80abc2008798384 /yarn
parent43af2817007eaa2cce2567bd83f5cde1ee28d1f7 (diff)
downloadspark-28dbae85aaf6842e22cd7465cb11cb34d58fc56d.tar.gz
spark-28dbae85aaf6842e22cd7465cb11cb34d58fc56d.tar.bz2
spark-28dbae85aaf6842e22cd7465cb11cb34d58fc56d.zip
[SPARK-2635] Fix race condition at SchedulerBackend.isReady in standalone mode
In SPARK-1946(PR #900), configuration <code>spark.scheduler.minRegisteredExecutorsRatio</code> was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set. Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(<code>--total-executor-cores</code>) as expected resources to judge whether SchedulerBackend is ready. Author: li-zhihui <zhihui.li@intel.com> Author: Li Zhihui <zhihui.li@intel.com> Closes #1525 from li-zhihui/fixre4s and squashes the following commits: e9a630b [Li Zhihui] Rename variable totalExecutors and clean codes abf4860 [Li Zhihui] Push down variable totalExpectedResources to children classes ca54bd9 [li-zhihui] Format log with String interpolation 88c7dc6 [li-zhihui] Few codes and docs refactor 41cf47e [li-zhihui] Fix race condition at SchedulerBackend.isReady in standalone mode
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala9
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala17
2 files changed, 17 insertions, 9 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index f8fb96b312..833e249f9f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -30,15 +30,15 @@ private[spark] class YarnClientSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
- if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
- ready = false
}
var client: Client = null
var appId: ApplicationId = null
var checkerThread: Thread = null
var stopping: Boolean = false
+ var totalExpectedExecutors = 0
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
arrayBuf: ArrayBuffer[String]) {
@@ -84,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend(
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
- totalExpectedExecutors.set(args.numExecutors)
+ totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
@@ -150,4 +150,7 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
+ }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 0ad1794d19..55665220a6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -27,19 +27,24 @@ private[spark] class YarnClusterSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
- if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+ var totalExpectedExecutors = 0
+
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
- ready = false
}
override def start() {
super.start()
- var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
- numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
+ totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
+ .getOrElse(totalExpectedExecutors)
}
// System property can override environment variable.
- numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
- totalExpectedExecutors.set(numExecutors)
+ totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
+ }
+
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
}