aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-02-10 11:07:25 -0800
committerAndrew Or <andrew@databricks.com>2015-02-10 11:12:06 -0800
commit69bc3bb6cffe82aee5ecd0b09410a847ba486b15 (patch)
tree4b57a84448d78e80bf52eafeb251e5b70aad8d43 /core
parentc7ad80ae4256c88e380e7488d48cf6eb14a92d76 (diff)
downloadspark-69bc3bb6cffe82aee5ecd0b09410a847ba486b15.tar.gz
spark-69bc3bb6cffe82aee5ecd0b09410a847ba486b15.tar.bz2
spark-69bc3bb6cffe82aee5ecd0b09410a847ba486b15.zip
SPARK-4136. Under dynamic allocation, cancel outstanding executor requests when no longer needed
This takes advantage of the changes made in SPARK-4337 to cancel pending requests to YARN when they are no longer needed. Each time the timer in `ExecutorAllocationManager` strikes, we compute `maxNumNeededExecutors`, the maximum number of executors we could fill with the current load. This is calculated as the total number of running and pending tasks divided by the number of cores per executor. If `maxNumNeededExecutors` is below the total number of running and pending executors, we call `requestTotalExecutors(maxNumNeededExecutors)` to let the cluster manager know that it should cancel any pending requests above this amount. If not, `maxNumNeededExecutors` is just used as a bound in alongside the configured `maxExecutors` to limit the number of new requests. The patch modifies the API exposed by `ExecutorAllocationClient` for requesting additional executors by moving from `requestExecutors` to `requestTotalExecutors`. This makes the communication between the `ExecutorAllocationManager` and the `YarnAllocator` easier to reason about and removes some state that needed to be kept in the `CoarseGrainedSchedulerBackend`. I think an argument can be made that this makes for a less attractive user-facing API in `SparkContext`, but I'm having trouble envisioning situations where a user would want to use either of these APIs. This will likely break some tests, but I wanted to get feedback on the approach before adding tests and polishing. Author: Sandy Ryza <sandy@cloudera.com> Closes #4168 from sryza/sandy-spark-4136 and squashes the following commits: 37ce77d [Sandy Ryza] Warn on negative number cd3b2ff [Sandy Ryza] SPARK-4136
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala149
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala36
5 files changed, 184 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index a46a81eabd..079055e00c 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -19,10 +19,18 @@ package org.apache.spark
/**
* A client that communicates with the cluster manager to request or kill executors.
+ * This is currently supported only in YARN mode.
*/
private[spark] trait ExecutorAllocationClient {
/**
+ * Express a preference to the cluster manager for a given total number of executors.
+ * This can result in canceling pending requests or filing additional requests.
+ * Return whether the request is acknowledged by the cluster manager.
+ */
+ private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+
+ /**
* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged by the cluster manager.
*/
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 02d54bf3b5..998695b6ac 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -201,18 +201,34 @@ private[spark] class ExecutorAllocationManager(
}
/**
- * If the add time has expired, request new executors and refresh the add time.
- * If the remove time for an existing executor has expired, kill the executor.
+ * The number of executors we would have if the cluster manager were to fulfill all our existing
+ * requests.
+ */
+ private def targetNumExecutors(): Int =
+ numExecutorsPending + executorIds.size - executorsPendingToRemove.size
+
+ /**
+ * The maximum number of executors we would need under the current load to satisfy all running
+ * and pending tasks, rounded up.
+ */
+ private def maxNumExecutorsNeeded(): Int = {
+ val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
+ (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
+ }
+
+ /**
+ * This is called at a fixed interval to regulate the number of pending executor requests
+ * and number of executors running.
+ *
+ * First, adjust our requested executors based on the add time and our current needs.
+ * Then, if the remove time for an existing executor has expired, kill the executor.
+ *
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
- if (addTime != NOT_SET && now >= addTime) {
- addExecutors()
- logDebug(s"Starting timer to add more executors (to " +
- s"expire in $sustainedSchedulerBacklogTimeout seconds)")
- addTime += sustainedSchedulerBacklogTimeout * 1000
- }
+
+ addOrCancelExecutorRequests(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
@@ -224,59 +240,89 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Check to see whether our existing allocation and the requests we've made previously exceed our
+ * current needs. If so, let the cluster manager know so that it can cancel pending requests that
+ * are unneeded.
+ *
+ * If not, and the add time has expired, see if we can request new executors and refresh the add
+ * time.
+ *
+ * @return the delta in the target number of executors.
+ */
+ private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
+ val currentTarget = targetNumExecutors
+ val maxNeeded = maxNumExecutorsNeeded
+
+ if (maxNeeded < currentTarget) {
+ // The target number exceeds the number we actually need, so stop adding new
+ // executors and inform the cluster manager to cancel the extra pending requests.
+ val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
+ client.requestTotalExecutors(newTotalExecutors)
+ numExecutorsToAdd = 1
+ updateNumExecutorsPending(newTotalExecutors)
+ } else if (addTime != NOT_SET && now >= addTime) {
+ val delta = addExecutors(maxNeeded)
+ logDebug(s"Starting timer to add more executors (to " +
+ s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+ addTime += sustainedSchedulerBacklogTimeout * 1000
+ delta
+ } else {
+ 0
+ }
+ }
+
+ /**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset the
* number of executors to add next round instead of continuing to double it.
- * Return the number actually requested.
+ *
+ * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
+ * tasks could fill
+ * @return the number of additional executors actually requested.
*/
- private def addExecutors(): Int = synchronized {
- // Do not request more executors if we have already reached the upper bound
- val numExistingExecutors = executorIds.size + numExecutorsPending
- if (numExistingExecutors >= maxNumExecutors) {
+ private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+ // Do not request more executors if it would put our target over the upper bound
+ val currentTarget = targetNumExecutors
+ if (currentTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because there are already ${executorIds.size} " +
s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
- // The number of executors needed to satisfy all pending tasks is the number of tasks pending
- // divided by the number of tasks each executor can fit, rounded up.
- val maxNumExecutorsPending =
- (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
- if (numExecutorsPending >= maxNumExecutorsPending) {
- logDebug(s"Not adding executors because there are already $numExecutorsPending " +
- s"pending and pending tasks could only fill $maxNumExecutorsPending")
- numExecutorsToAdd = 1
- return 0
- }
-
- // It's never useful to request more executors than could satisfy all the pending tasks, so
- // cap request at that amount.
- // Also cap request with respect to the configured upper bound.
- val maxNumExecutorsToAdd = math.min(
- maxNumExecutorsPending - numExecutorsPending,
- maxNumExecutors - numExistingExecutors)
- assert(maxNumExecutorsToAdd > 0)
-
- val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
-
- val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
- val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
+ val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
+ val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
+ val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
if (addRequestAcknowledged) {
- logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
- s"tasks are backlogged (new desired total will be $newTotalExecutors)")
- numExecutorsToAdd =
- if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1
- numExecutorsPending += actualNumExecutorsToAdd
- actualNumExecutorsToAdd
+ val delta = updateNumExecutorsPending(newTotalExecutors)
+ logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
+ s" (new desired total will be $newTotalExecutors)")
+ numExecutorsToAdd = if (delta == numExecutorsToAdd) {
+ numExecutorsToAdd * 2
+ } else {
+ 1
+ }
+ delta
} else {
- logWarning(s"Unable to reach the cluster manager " +
- s"to request $actualNumExecutorsToAdd executors!")
+ logWarning(
+ s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
0
}
}
/**
+ * Given the new target number of executors, update the number of pending executor requests,
+ * and return the delta from the old number of pending requests.
+ */
+ private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
+ val newNumExecutorsPending =
+ newTotalExecutors - executorIds.size + executorsPendingToRemove.size
+ val delta = newNumExecutorsPending - numExecutorsPending
+ numExecutorsPending = newNumExecutorsPending
+ delta
+ }
+
+ /**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
*/
@@ -415,6 +461,8 @@ private[spark] class ExecutorAllocationManager(
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
+ // Number of tasks currently running on the cluster. Should be 0 when no stages are active.
+ private var numRunningTasks: Int = _
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
@@ -435,6 +483,10 @@ private[spark] class ExecutorAllocationManager(
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
+ if (numRunningTasks != 0) {
+ logWarning("No stages are running, but numRunningTasks != 0")
+ numRunningTasks = 0
+ }
}
}
}
@@ -446,6 +498,7 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskStart.taskInfo.executorId
allocationManager.synchronized {
+ numRunningTasks += 1
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
@@ -475,7 +528,8 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
allocationManager.synchronized {
- // If the executor is no longer running scheduled any tasks, mark it as idle
+ numRunningTasks -= 1
+ // If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
@@ -515,6 +569,11 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * The number of tasks currently running across all stages.
+ */
+ def totalRunningTasks(): Int = numRunningTasks
+
+ /**
* Return true if an executor is not currently running a task, and false otherwise.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8d3c3d000a..04ca5d1019 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1104,9 +1104,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/**
+ * Express a preference to the cluster manager for a given total number of executors.
+ * This can result in canceling pending requests or filing additional requests.
+ * This is currently only supported in YARN mode. Return whether the request is received.
+ */
+ private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
+ assert(master.contains("yarn") || dynamicAllocationTesting,
+ "Requesting executors is currently only supported in YARN mode")
+ schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ b.requestTotalExecutors(numExecutors)
+ case _ =>
+ logWarning("Requesting executors is only supported in coarse-grained mode")
+ false
+ }
+ }
+
+ /**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
- * This is currently only supported in Yarn mode. Return whether the request is received.
+ * This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
@@ -1124,7 +1141,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request that the cluster manager kill the specified executors.
- * This is currently only supported in Yarn mode. Return whether the request is received.
+ * This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f9ca93432b..99986c32b0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -311,7 +311,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
/**
* Request an additional number of executors from the cluster manager.
- * Return whether the request is acknowledged.
+ * @return whether the request is acknowledged.
*/
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
if (numAdditionalExecutors < 0) {
@@ -328,6 +328,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}
/**
+ * Express a preference to the cluster manager for a given total number of executors. This can
+ * result in canceling pending requests or filing additional requests.
+ * @return whether the request is acknowledged.
+ */
+ final override def requestTotalExecutors(numExecutors: Int): Boolean = synchronized {
+ if (numAdditionalExecutors < 0) {
+ throw new IllegalArgumentException(
+ "Attempted to request a negative number of executor(s) " +
+ s"$numExecutors from the cluster manager. Please specify a positive number!")
+ }
+ numPendingExecutors =
+ math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
+ doRequestTotalExecutors(numExecutors)
+ }
+
+ /**
* Request executors from the cluster manager by specifying the total number desired,
* including existing pending and running executors.
*
@@ -337,7 +353,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
* insufficient resources to satisfy the first request. We make the assumption here that the
* cluster manager will eventually fulfill all requests when resources free up.
*
- * Return whether the request is acknowledged.
+ * @return whether the request is acknowledged.
*/
protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 9eb87f0160..5d96eabd34 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -175,6 +175,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(numExecutorsPending(manager) === 9)
}
+ test("cancel pending executors when no longer needed") {
+ sc = createSparkContext(1, 10)
+ val manager = sc.executorAllocationManager.get
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
+
+ assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 2)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsPending(manager) === 3)
+
+ val task1Info = createTaskInfo(0, 0, "executor-1")
+ sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))
+
+ assert(numExecutorsToAdd(manager) === 4)
+ assert(addExecutors(manager) === 2)
+
+ val task2Info = createTaskInfo(1, 0, "executor-1")
+ sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task1Info, null))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task2Info, null))
+
+ assert(adjustRequestedExecutors(manager) === -1)
+ }
+
test("remove executors") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
@@ -679,6 +706,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
+ private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded)
private val _executorsPendingToRemove =
PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
private val _executorIds = PrivateMethod[collection.Set[String]]('executorIds)
@@ -686,6 +714,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
private val _schedule = PrivateMethod[Unit]('schedule)
private val _addExecutors = PrivateMethod[Int]('addExecutors)
+ private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests)
private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
@@ -724,7 +753,12 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
}
private def addExecutors(manager: ExecutorAllocationManager): Int = {
- manager invokePrivate _addExecutors()
+ val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
+ manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
+ }
+
+ private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _addOrCancelExecutorRequests(0L)
}
private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {