aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala149
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala36
5 files changed, 184 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index a46a81eabd..079055e00c 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -19,10 +19,18 @@ package org.apache.spark
/**
* A client that communicates with the cluster manager to request or kill executors.
+ * This is currently supported only in YARN mode.
*/
private[spark] trait ExecutorAllocationClient {
/**
+ * Express a preference to the cluster manager for a given total number of executors.
+ * This can result in canceling pending requests or filing additional requests.
+ * Return whether the request is acknowledged by the cluster manager.
+ */
+ private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+
+ /**
* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged by the cluster manager.
*/
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 02d54bf3b5..998695b6ac 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -201,18 +201,34 @@ private[spark] class ExecutorAllocationManager(
}
/**
- * If the add time has expired, request new executors and refresh the add time.
- * If the remove time for an existing executor has expired, kill the executor.
+ * The number of executors we would have if the cluster manager were to fulfill all our existing
+ * requests.
+ */
+ private def targetNumExecutors(): Int =
+ numExecutorsPending + executorIds.size - executorsPendingToRemove.size
+
+ /**
+ * The maximum number of executors we would need under the current load to satisfy all running
+ * and pending tasks, rounded up.
+ */
+ private def maxNumExecutorsNeeded(): Int = {
+ val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
+ (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
+ }
+
+ /**
+ * This is called at a fixed interval to regulate the number of pending executor requests
+ * and number of executors running.
+ *
+ * First, adjust our requested executors based on the add time and our current needs.
+ * Then, if the remove time for an existing executor has expired, kill the executor.
+ *
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
- if (addTime != NOT_SET && now >= addTime) {
- addExecutors()
- logDebug(s"Starting timer to add more executors (to " +
- s"expire in $sustainedSchedulerBacklogTimeout seconds)")
- addTime += sustainedSchedulerBacklogTimeout * 1000
- }
+
+ addOrCancelExecutorRequests(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
@@ -224,59 +240,89 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Check to see whether our existing allocation and the requests we've made previously exceed our
+ * current needs. If so, let the cluster manager know so that it can cancel pending requests that
+ * are unneeded.
+ *
+ * If not, and the add time has expired, see if we can request new executors and refresh the add
+ * time.
+ *
+ * @return the delta in the target number of executors.
+ */
+ private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
+ val currentTarget = targetNumExecutors
+ val maxNeeded = maxNumExecutorsNeeded
+
+ if (maxNeeded < currentTarget) {
+ // The target number exceeds the number we actually need, so stop adding new
+ // executors and inform the cluster manager to cancel the extra pending requests.
+ val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
+ client.requestTotalExecutors(newTotalExecutors)
+ numExecutorsToAdd = 1
+ updateNumExecutorsPending(newTotalExecutors)
+ } else if (addTime != NOT_SET && now >= addTime) {
+ val delta = addExecutors(maxNeeded)
+ logDebug(s"Starting timer to add more executors (to " +
+ s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+ addTime += sustainedSchedulerBacklogTimeout * 1000
+ delta
+ } else {
+ 0
+ }
+ }
+
+ /**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset the
* number of executors to add next round instead of continuing to double it.
- * Return the number actually requested.
+ *
+ * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
+ * tasks could fill
+ * @return the number of additional executors actually requested.
*/
- private def addExecutors(): Int = synchronized {
- // Do not request more executors if we have already reached the upper bound
- val numExistingExecutors = executorIds.size + numExecutorsPending
- if (numExistingExecutors >= maxNumExecutors) {
+ private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+ // Do not request more executors if it would put our target over the upper bound
+ val currentTarget = targetNumExecutors
+ if (currentTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because there are already ${executorIds.size} " +
s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
- // The number of executors needed to satisfy all pending tasks is the number of tasks pending
- // divided by the number of tasks each executor can fit, rounded up.
- val maxNumExecutorsPending =
- (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
- if (numExecutorsPending >= maxNumExecutorsPending) {
- logDebug(s"Not adding executors because there are already $numExecutorsPending " +
- s"pending and pending tasks could only fill $maxNumExecutorsPending")
- numExecutorsToAdd = 1
- return 0
- }
-
- // It's never useful to request more executors than could satisfy all the pending tasks, so
- // cap request at that amount.
- // Also cap request with respect to the configured upper bound.
- val maxNumExecutorsToAdd = math.min(
- maxNumExecutorsPending - numExecutorsPending,
- maxNumExecutors - numExistingExecutors)
- assert(maxNumExecutorsToAdd > 0)
-
- val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
-
- val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
- val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
+ val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
+ val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
+ val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
if (addRequestAcknowledged) {
- logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
- s"tasks are backlogged (new desired total will be $newTotalExecutors)")
- numExecutorsToAdd =
- if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1
- numExecutorsPending += actualNumExecutorsToAdd
- actualNumExecutorsToAdd
+ val delta = updateNumExecutorsPending(newTotalExecutors)
+ logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
+ s" (new desired total will be $newTotalExecutors)")
+ numExecutorsToAdd = if (delta == numExecutorsToAdd) {
+ numExecutorsToAdd * 2
+ } else {
+ 1
+ }
+ delta
} else {
- logWarning(s"Unable to reach the cluster manager " +
- s"to request $actualNumExecutorsToAdd executors!")
+ logWarning(
+ s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
0
}
}
/**
+ * Given the new target number of executors, update the number of pending executor requests,
+ * and return the delta from the old number of pending requests.
+ */
+ private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
+ val newNumExecutorsPending =
+ newTotalExecutors - executorIds.size + executorsPendingToRemove.size
+ val delta = newNumExecutorsPending - numExecutorsPending
+ numExecutorsPending = newNumExecutorsPending
+ delta
+ }
+
+ /**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
*/
@@ -415,6 +461,8 @@ private[spark] class ExecutorAllocationManager(
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
+ // Number of tasks currently running on the cluster. Should be 0 when no stages are active.
+ private var numRunningTasks: Int = _
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
@@ -435,6 +483,10 @@ private[spark] class ExecutorAllocationManager(
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
+ if (numRunningTasks != 0) {
+ logWarning("No stages are running, but numRunningTasks != 0")
+ numRunningTasks = 0
+ }
}
}
}
@@ -446,6 +498,7 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskStart.taskInfo.executorId
allocationManager.synchronized {
+ numRunningTasks += 1
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
@@ -475,7 +528,8 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
allocationManager.synchronized {
- // If the executor is no longer running scheduled any tasks, mark it as idle
+ numRunningTasks -= 1
+ // If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
@@ -515,6 +569,11 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * The number of tasks currently running across all stages.
+ */
+ def totalRunningTasks(): Int = numRunningTasks
+
+ /**
* Return true if an executor is not currently running a task, and false otherwise.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8d3c3d000a..04ca5d1019 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1104,9 +1104,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/**
+ * Express a preference to the cluster manager for a given total number of executors.
+ * This can result in canceling pending requests or filing additional requests.
+ * This is currently only supported in YARN mode. Return whether the request is received.
+ */
+ private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
+ assert(master.contains("yarn") || dynamicAllocationTesting,
+ "Requesting executors is currently only supported in YARN mode")
+ schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ b.requestTotalExecutors(numExecutors)
+ case _ =>
+ logWarning("Requesting executors is only supported in coarse-grained mode")
+ false
+ }
+ }
+
+ /**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
- * This is currently only supported in Yarn mode. Return whether the request is received.
+ * This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
@@ -1124,7 +1141,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request that the cluster manager kill the specified executors.
- * This is currently only supported in Yarn mode. Return whether the request is received.
+ * This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f9ca93432b..99986c32b0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -311,7 +311,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
/**
* Request an additional number of executors from the cluster manager.
- * Return whether the request is acknowledged.
+ * @return whether the request is acknowledged.
*/
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
if (numAdditionalExecutors < 0) {
@@ -328,6 +328,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}
/**
+ * Express a preference to the cluster manager for a given total number of executors. This can
+ * result in canceling pending requests or filing additional requests.
+ * @return whether the request is acknowledged.
+ */
+ final override def requestTotalExecutors(numExecutors: Int): Boolean = synchronized {
+ if (numAdditionalExecutors < 0) {
+ throw new IllegalArgumentException(
+ "Attempted to request a negative number of executor(s) " +
+ s"$numExecutors from the cluster manager. Please specify a positive number!")
+ }
+ numPendingExecutors =
+ math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
+ doRequestTotalExecutors(numExecutors)
+ }
+
+ /**
* Request executors from the cluster manager by specifying the total number desired,
* including existing pending and running executors.
*
@@ -337,7 +353,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
* insufficient resources to satisfy the first request. We make the assumption here that the
* cluster manager will eventually fulfill all requests when resources free up.
*
- * Return whether the request is acknowledged.
+ * @return whether the request is acknowledged.
*/
protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 9eb87f0160..5d96eabd34 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -175,6 +175,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(numExecutorsPending(manager) === 9)
}
+ test("cancel pending executors when no longer needed") {
+ sc = createSparkContext(1, 10)
+ val manager = sc.executorAllocationManager.get
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
+
+ assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 2)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsPending(manager) === 3)
+
+ val task1Info = createTaskInfo(0, 0, "executor-1")
+ sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))
+
+ assert(numExecutorsToAdd(manager) === 4)
+ assert(addExecutors(manager) === 2)
+
+ val task2Info = createTaskInfo(1, 0, "executor-1")
+ sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task1Info, null))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task2Info, null))
+
+ assert(adjustRequestedExecutors(manager) === -1)
+ }
+
test("remove executors") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
@@ -679,6 +706,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
+ private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded)
private val _executorsPendingToRemove =
PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
private val _executorIds = PrivateMethod[collection.Set[String]]('executorIds)
@@ -686,6 +714,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
private val _schedule = PrivateMethod[Unit]('schedule)
private val _addExecutors = PrivateMethod[Int]('addExecutors)
+ private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests)
private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
@@ -724,7 +753,12 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
}
private def addExecutors(manager: ExecutorAllocationManager): Int = {
- manager invokePrivate _addExecutors()
+ val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
+ manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
+ }
+
+ private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _addOrCancelExecutorRequests(0L)
}
private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {