diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala | 55 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala | 48 |
2 files changed, 94 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index ef93009a07..88adb89299 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -28,7 +28,9 @@ import org.apache.spark.scheduler._ * the scheduler queue is not drained in N seconds, then new executors are added. If the queue * persists for another M seconds, then more executors are added and so on. The number added * in each round increases exponentially from the previous round until an upper bound on the - * number of executors has been reached. + * number of executors has been reached. The upper bound is based both on a configured property + * and on the number of tasks pending: the policy will never increase the number of executor + * requests past the number needed to handle all pending tasks. * * The rationale for the exponential increase is twofold: (1) Executors should be added slowly * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, @@ -82,6 +84,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) + // TODO: The default value of 1 for spark.executor.cores works right now because dynamic + // allocation is only supported for YARN and the default number of cores per executor in YARN is + // 1, but it might need to be attained differently for different cluster managers + private val tasksPerExecutor = + conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) + validateSettings() // Number of executors to add in the next round @@ -110,6 +118,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // Clock used to schedule when executors should be added and removed private var clock: Clock = new RealClock + // Listener for Spark events that impact the allocation policy + private val listener = new ExecutorAllocationListener(this) + /** * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. @@ -141,6 +152,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging throw new SparkException("Dynamic allocation of executors requires the external " + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } + if (tasksPerExecutor == 0) { + throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores") + } } /** @@ -154,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging * Register for scheduler callbacks to decide when to add and remove executors. */ def start(): Unit = { - val listener = new ExecutorAllocationListener(this) sc.addSparkListener(listener) startPolling() } @@ -218,13 +231,27 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging return 0 } - // Request executors with respect to the upper bound - val actualNumExecutorsToAdd = - if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) { - numExecutorsToAdd - } else { - maxNumExecutors - numExistingExecutors - } + // The number of executors needed to satisfy all pending tasks is the number of tasks pending + // divided by the number of tasks each executor can fit, rounded up. + val maxNumExecutorsPending = + (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor + if (numExecutorsPending >= maxNumExecutorsPending) { + logDebug(s"Not adding executors because there are already $numExecutorsPending " + + s"pending and pending tasks could only fill $maxNumExecutorsPending") + numExecutorsToAdd = 1 + return 0 + } + + // It's never useful to request more executors than could satisfy all the pending tasks, so + // cap request at that amount. + // Also cap request with respect to the configured upper bound. + val maxNumExecutorsToAdd = math.min( + maxNumExecutorsPending - numExecutorsPending, + maxNumExecutors - numExistingExecutors) + assert(maxNumExecutorsToAdd > 0) + + val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) + val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd) if (addRequestAcknowledged) { @@ -445,6 +472,16 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId) } + + /** + * An estimate of the total number of pending tasks remaining for currently running stages. Does + * not account for tasks which may have failed and been resubmitted. + */ + def totalPendingTasks(): Int = { + stageIdToNumTasks.map { case (stageId, numTasks) => + numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0) + }.sum + } } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 66cf60d25f..4b27477790 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -76,6 +76,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { test("add executors") { sc = createSparkContext(1, 10) val manager = sc.executorAllocationManager.get + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Keep adding until the limit is reached assert(numExecutorsPending(manager) === 0) @@ -117,6 +118,51 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { assert(numExecutorsToAdd(manager) === 1) } + test("add executors capped by num pending tasks") { + sc = createSparkContext(1, 10) + val manager = sc.executorAllocationManager.get + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) + + // Verify that we're capped at number of tasks in the stage + assert(numExecutorsPending(manager) === 0) + assert(numExecutorsToAdd(manager) === 1) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 1) + assert(numExecutorsToAdd(manager) === 2) + assert(addExecutors(manager) === 2) + assert(numExecutorsPending(manager) === 3) + assert(numExecutorsToAdd(manager) === 4) + assert(addExecutors(manager) === 2) + assert(numExecutorsPending(manager) === 5) + assert(numExecutorsToAdd(manager) === 1) + + // Verify that running a task reduces the cap + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) + sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 6) + assert(numExecutorsToAdd(manager) === 2) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 7) + assert(numExecutorsToAdd(manager) === 1) + + // Verify that re-running a task doesn't reduce the cap further + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) + sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 8) + assert(numExecutorsToAdd(manager) === 2) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 9) + assert(numExecutorsToAdd(manager) === 1) + + // Verify that running a task once we're at our limit doesn't blow things up + sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) + assert(addExecutors(manager) === 0) + assert(numExecutorsPending(manager) === 9) + } + test("remove executors") { sc = createSparkContext(5, 10) val manager = sc.executorAllocationManager.get @@ -170,6 +216,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { test ("interleaving add and remove") { sc = createSparkContext(5, 10) val manager = sc.executorAllocationManager.get + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Add a few executors assert(addExecutors(manager) === 1) @@ -343,6 +390,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { val clock = new TestClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Scheduler queue backlogged onSchedulerBacklogged(manager) |