aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorli-zhihui <zhihui.li@intel.com>2014-08-08 22:52:56 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-08 22:52:56 -0700
commit28dbae85aaf6842e22cd7465cb11cb34d58fc56d (patch)
tree37603cc720180a2d90bbf708b80abc2008798384 /core
parent43af2817007eaa2cce2567bd83f5cde1ee28d1f7 (diff)
downloadspark-28dbae85aaf6842e22cd7465cb11cb34d58fc56d.tar.gz
spark-28dbae85aaf6842e22cd7465cb11cb34d58fc56d.tar.bz2
spark-28dbae85aaf6842e22cd7465cb11cb34d58fc56d.zip
[SPARK-2635] Fix race condition at SchedulerBackend.isReady in standalone mode
In SPARK-1946(PR #900), configuration <code>spark.scheduler.minRegisteredExecutorsRatio</code> was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set. Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(<code>--total-executor-cores</code>) as expected resources to judge whether SchedulerBackend is ready. Author: li-zhihui <zhihui.li@intel.com> Author: Li Zhihui <zhihui.li@intel.com> Closes #1525 from li-zhihui/fixre4s and squashes the following commits: e9a630b [Li Zhihui] Rename variable totalExecutors and clean codes abf4860 [Li Zhihui] Push down variable totalExpectedResources to children classes ca54bd9 [li-zhihui] Format log with String interpolation 88c7dc6 [li-zhihui] Few codes and docs refactor 41cf47e [li-zhihui] Fix race condition at SchedulerBackend.isReady in standalone mode
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala6
2 files changed, 19 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9f085eef46..33500d967e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
- var totalExpectedExecutors = new AtomicInteger(0)
+ var totalRegisteredExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
- // Submit tasks only after (registered executors / total expected executors)
+ // Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
- var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
- if (minRegisteredRatio > 1) minRegisteredRatio = 1
- // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
+ var minRegisteredRatio =
+ math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
+ // Submit tasks after maxRegisteredWaitingTime milliseconds
+ // if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTime =
- conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
+ conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
- var ready = if (minRegisteredRatio <= 0) true else false
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
@@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
- if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
- ready = true
- logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
- executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
- ", minRegisteredExecutorsRatio: " + minRegisteredRatio)
- }
+ totalRegisteredExecutors.addAndGet(1)
makeOffers()
}
@@ -268,14 +263,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
}
+ def sufficientResourcesRegistered(): Boolean = true
+
override def isReady(): Boolean = {
- if (ready) {
+ if (sufficientResourcesRegistered) {
+ logInfo("SchedulerBackend is ready for scheduling beginning after " +
+ s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
return true
}
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
- ready = true
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
- "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
+ s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
return true
}
false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a28446f6c8..589dba2e40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
+ val totalExpectedCores = maxCores.getOrElse(0)
override def start() {
super.start()
@@ -97,7 +98,6 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
memory: Int) {
- totalExpectedExecutors.addAndGet(1)
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
@@ -110,4 +110,8 @@ private[spark] class SparkDeploySchedulerBackend(
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason.toString)
}
+
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
+ }
}