diff options
5 files changed, 184 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index a46a81eabd..079055e00c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -19,10 +19,18 @@ package org.apache.spark /** * A client that communicates with the cluster manager to request or kill executors. + * This is currently supported only in YARN mode. */ private[spark] trait ExecutorAllocationClient { /** + * Express a preference to the cluster manager for a given total number of executors. + * This can result in canceling pending requests or filing additional requests. + * Return whether the request is acknowledged by the cluster manager. + */ + private[spark] def requestTotalExecutors(numExecutors: Int): Boolean + + /** * Request an additional number of executors from the cluster manager. * Return whether the request is acknowledged by the cluster manager. */ diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 02d54bf3b5..998695b6ac 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -201,18 +201,34 @@ private[spark] class ExecutorAllocationManager( } /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * The number of executors we would have if the cluster manager were to fulfill all our existing + * requests. + */ + private def targetNumExecutors(): Int = + numExecutorsPending + executorIds.size - executorsPendingToRemove.size + + /** + * The maximum number of executors we would need under the current load to satisfy all running + * and pending tasks, rounded up. + */ + private def maxNumExecutorsNeeded(): Int = { + val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks + (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor + } + + /** + * This is called at a fixed interval to regulate the number of pending executor requests + * and number of executors running. + * + * First, adjust our requested executors based on the add time and our current needs. + * Then, if the remove time for an existing executor has expired, kill the executor. + * * This is factored out into its own method for testing. */ private def schedule(): Unit = synchronized { val now = clock.getTimeMillis - if (addTime != NOT_SET && now >= addTime) { - addExecutors() - logDebug(s"Starting timer to add more executors (to " + - s"expire in $sustainedSchedulerBacklogTimeout seconds)") - addTime += sustainedSchedulerBacklogTimeout * 1000 - } + + addOrCancelExecutorRequests(now) removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime @@ -224,59 +240,89 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def addOrCancelExecutorRequests(now: Long): Int = synchronized { + val currentTarget = targetNumExecutors + val maxNeeded = maxNumExecutorsNeeded + + if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + val newTotalExecutors = math.max(maxNeeded, minNumExecutors) + client.requestTotalExecutors(newTotalExecutors) + numExecutorsToAdd = 1 + updateNumExecutorsPending(newTotalExecutors) + } else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + + s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta + } else { + 0 + } + } + + /** * Request a number of executors from the cluster manager. * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. - * Return the number actually requested. + * + * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending + * tasks could fill + * @return the number of additional executors actually requested. */ - private def addExecutors(): Int = synchronized { - // Do not request more executors if we have already reached the upper bound - val numExistingExecutors = executorIds.size + numExecutorsPending - if (numExistingExecutors >= maxNumExecutors) { + private def addExecutors(maxNumExecutorsNeeded: Int): Int = { + // Do not request more executors if it would put our target over the upper bound + val currentTarget = targetNumExecutors + if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } - // The number of executors needed to satisfy all pending tasks is the number of tasks pending - // divided by the number of tasks each executor can fit, rounded up. - val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor - if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + - s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 - } - - // It's never useful to request more executors than could satisfy all the pending tasks, so - // cap request at that amount. - // Also cap request with respect to the configured upper bound. - val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) - assert(maxNumExecutorsToAdd > 0) - - val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - - val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd - val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) + val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded) + val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors) + val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors) if (addRequestAcknowledged) { - logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " + - s"tasks are backlogged (new desired total will be $newTotalExecutors)") - numExecutorsToAdd = - if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1 - numExecutorsPending += actualNumExecutorsToAdd - actualNumExecutorsToAdd + val delta = updateNumExecutorsPending(newTotalExecutors) + logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" + + s" (new desired total will be $newTotalExecutors)") + numExecutorsToAdd = if (delta == numExecutorsToAdd) { + numExecutorsToAdd * 2 + } else { + 1 + } + delta } else { - logWarning(s"Unable to reach the cluster manager " + - s"to request $actualNumExecutorsToAdd executors!") + logWarning( + s"Unable to reach the cluster manager to request $newTotalExecutors total executors!") 0 } } /** + * Given the new target number of executors, update the number of pending executor requests, + * and return the delta from the old number of pending requests. + */ + private def updateNumExecutorsPending(newTotalExecutors: Int): Int = { + val newNumExecutorsPending = + newTotalExecutors - executorIds.size + executorsPendingToRemove.size + val delta = newNumExecutorsPending - numExecutorsPending + numExecutorsPending = newNumExecutorsPending + delta + } + + /** * Request the cluster manager to remove the given executor. * Return whether the request is received. */ @@ -415,6 +461,8 @@ private[spark] class ExecutorAllocationManager( private val stageIdToNumTasks = new mutable.HashMap[Int, Int] private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] + // Number of tasks currently running on the cluster. Should be 0 when no stages are active. + private var numRunningTasks: Int = _ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { val stageId = stageSubmitted.stageInfo.stageId @@ -435,6 +483,10 @@ private[spark] class ExecutorAllocationManager( // This is needed in case the stage is aborted for any reason if (stageIdToNumTasks.isEmpty) { allocationManager.onSchedulerQueueEmpty() + if (numRunningTasks != 0) { + logWarning("No stages are running, but numRunningTasks != 0") + numRunningTasks = 0 + } } } } @@ -446,6 +498,7 @@ private[spark] class ExecutorAllocationManager( val executorId = taskStart.taskInfo.executorId allocationManager.synchronized { + numRunningTasks += 1 // This guards against the race condition in which the `SparkListenerTaskStart` // event is posted before the `SparkListenerBlockManagerAdded` event, which is // possible because these events are posted in different threads. (see SPARK-4951) @@ -475,7 +528,8 @@ private[spark] class ExecutorAllocationManager( val executorId = taskEnd.taskInfo.executorId val taskId = taskEnd.taskInfo.taskId allocationManager.synchronized { - // If the executor is no longer running scheduled any tasks, mark it as idle + numRunningTasks -= 1 + // If the executor is no longer running any scheduled tasks, mark it as idle if (executorIdToTaskIds.contains(executorId)) { executorIdToTaskIds(executorId) -= taskId if (executorIdToTaskIds(executorId).isEmpty) { @@ -515,6 +569,11 @@ private[spark] class ExecutorAllocationManager( } /** + * The number of tasks currently running across all stages. + */ + def totalRunningTasks(): Int = numRunningTasks + + /** * Return true if an executor is not currently running a task, and false otherwise. * * Note: This is not thread-safe without the caller owning the `allocationManager` lock. diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8d3c3d000a..04ca5d1019 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1104,9 +1104,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** + * Express a preference to the cluster manager for a given total number of executors. + * This can result in canceling pending requests or filing additional requests. + * This is currently only supported in YARN mode. Return whether the request is received. + */ + private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Requesting executors is currently only supported in YARN mode") + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.requestTotalExecutors(numExecutors) + case _ => + logWarning("Requesting executors is only supported in coarse-grained mode") + false + } + } + + /** * :: DeveloperApi :: * Request an additional number of executors from the cluster manager. - * This is currently only supported in Yarn mode. Return whether the request is received. + * This is currently only supported in YARN mode. Return whether the request is received. */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { @@ -1124,7 +1141,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: * Request that the cluster manager kill the specified executors. - * This is currently only supported in Yarn mode. Return whether the request is received. + * This is currently only supported in YARN mode. Return whether the request is received. */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f9ca93432b..99986c32b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -311,7 +311,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste /** * Request an additional number of executors from the cluster manager. - * Return whether the request is acknowledged. + * @return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { if (numAdditionalExecutors < 0) { @@ -328,6 +328,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste } /** + * Express a preference to the cluster manager for a given total number of executors. This can + * result in canceling pending requests or filing additional requests. + * @return whether the request is acknowledged. + */ + final override def requestTotalExecutors(numExecutors: Int): Boolean = synchronized { + if (numAdditionalExecutors < 0) { + throw new IllegalArgumentException( + "Attempted to request a negative number of executor(s) " + + s"$numExecutors from the cluster manager. Please specify a positive number!") + } + numPendingExecutors = + math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0) + doRequestTotalExecutors(numExecutors) + } + + /** * Request executors from the cluster manager by specifying the total number desired, * including existing pending and running executors. * @@ -337,7 +353,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste * insufficient resources to satisfy the first request. We make the assumption here that the * cluster manager will eventually fulfill all requests when resources free up. * - * Return whether the request is acknowledged. + * @return whether the request is acknowledged. */ protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 9eb87f0160..5d96eabd34 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -175,6 +175,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { assert(numExecutorsPending(manager) === 9) } + test("cancel pending executors when no longer needed") { + sc = createSparkContext(1, 10) + val manager = sc.executorAllocationManager.get + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) + + assert(numExecutorsPending(manager) === 0) + assert(numExecutorsToAdd(manager) === 1) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 1) + assert(numExecutorsToAdd(manager) === 2) + assert(addExecutors(manager) === 2) + assert(numExecutorsPending(manager) === 3) + + val task1Info = createTaskInfo(0, 0, "executor-1") + sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info)) + + assert(numExecutorsToAdd(manager) === 4) + assert(addExecutors(manager) === 2) + + val task2Info = createTaskInfo(1, 0, "executor-1") + sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info)) + sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task1Info, null)) + sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task2Info, null)) + + assert(adjustRequestedExecutors(manager) === -1) + } + test("remove executors") { sc = createSparkContext(5, 10) val manager = sc.executorAllocationManager.get @@ -679,6 +706,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd) private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending) + private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded) private val _executorsPendingToRemove = PrivateMethod[collection.Set[String]]('executorsPendingToRemove) private val _executorIds = PrivateMethod[collection.Set[String]]('executorIds) @@ -686,6 +714,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes) private val _schedule = PrivateMethod[Unit]('schedule) private val _addExecutors = PrivateMethod[Int]('addExecutors) + private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests) private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor) private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded) private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved) @@ -724,7 +753,12 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { } private def addExecutors(manager: ExecutorAllocationManager): Int = { - manager invokePrivate _addExecutors() + val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded() + manager invokePrivate _addExecutors(maxNumExecutorsNeeded) + } + + private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = { + manager invokePrivate _addOrCancelExecutorRequests(0L) } private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = { |