aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala6
-rw-r--r--docs/configuration.md13
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala9
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala17
5 files changed, 43 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9f085eef46..33500d967e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
- var totalExpectedExecutors = new AtomicInteger(0)
+ var totalRegisteredExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
- // Submit tasks only after (registered executors / total expected executors)
+ // Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
- var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
- if (minRegisteredRatio > 1) minRegisteredRatio = 1
- // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
+ var minRegisteredRatio =
+ math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
+ // Submit tasks after maxRegisteredWaitingTime milliseconds
+ // if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTime =
- conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
+ conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
- var ready = if (minRegisteredRatio <= 0) true else false
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
@@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
- if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
- ready = true
- logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
- executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
- ", minRegisteredExecutorsRatio: " + minRegisteredRatio)
- }
+ totalRegisteredExecutors.addAndGet(1)
makeOffers()
}
@@ -268,14 +263,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
}
+ def sufficientResourcesRegistered(): Boolean = true
+
override def isReady(): Boolean = {
- if (ready) {
+ if (sufficientResourcesRegistered) {
+ logInfo("SchedulerBackend is ready for scheduling beginning after " +
+ s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
return true
}
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
- ready = true
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
- "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
+ s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
return true
}
false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a28446f6c8..589dba2e40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
+ val totalExpectedCores = maxCores.getOrElse(0)
override def start() {
super.start()
@@ -97,7 +98,6 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
memory: Int) {
- totalExpectedExecutors.addAndGet(1)
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
@@ -110,4 +110,8 @@ private[spark] class SparkDeploySchedulerBackend(
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason.toString)
}
+
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
+ }
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 4d27c5a918..617a72a021 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -825,21 +825,22 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
</tr>
- <td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
+ <td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
<td>0</td>
<td>
- The minimum ratio of registered executors (registered executors / total expected executors)
+ The minimum ratio of registered resources (registered resources / total expected resources)
+ (resources are executors in yarn mode, CPU cores in standalone mode)
to wait for before scheduling begins. Specified as a double between 0 and 1.
- Regardless of whether the minimum ratio of executors has been reached,
+ Regardless of whether the minimum ratio of resources has been reached,
the maximum amount of time it will wait before scheduling begins is controlled by config
- <code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code>
+ <code>spark.scheduler.maxRegisteredResourcesWaitingTime</code>
</td>
</tr>
<tr>
- <td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
+ <td><code>spark.scheduler.maxRegisteredResourcesWaitingTime</code></td>
<td>30000</td>
<td>
- Maximum amount of time to wait for executors to register before scheduling begins
+ Maximum amount of time to wait for resources to register before scheduling begins
(in milliseconds).
</td>
</tr>
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index f8fb96b312..833e249f9f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -30,15 +30,15 @@ private[spark] class YarnClientSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
- if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
- ready = false
}
var client: Client = null
var appId: ApplicationId = null
var checkerThread: Thread = null
var stopping: Boolean = false
+ var totalExpectedExecutors = 0
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
arrayBuf: ArrayBuffer[String]) {
@@ -84,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend(
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
- totalExpectedExecutors.set(args.numExecutors)
+ totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
@@ -150,4 +150,7 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
+ }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 0ad1794d19..55665220a6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -27,19 +27,24 @@ private[spark] class YarnClusterSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
- if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+ var totalExpectedExecutors = 0
+
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
- ready = false
}
override def start() {
super.start()
- var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
- numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
+ totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
+ .getOrElse(totalExpectedExecutors)
}
// System property can override environment variable.
- numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
- totalExpectedExecutors.set(numExecutors)
+ totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
+ }
+
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
}