aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-10-29 17:48:59 -0700
committerAndrew Or <andrew@databricks.com>2014-10-29 17:48:59 -0700
commit8d59b37b02eb36f37bcefafb952519d7dca744ad (patch)
tree7784af4b3bb826672858417b3d459e58b1eff51c /core
parente7fd80413d531e23b6c4def0ee32e52a39da36fa (diff)
downloadspark-8d59b37b02eb36f37bcefafb952519d7dca744ad.tar.gz
spark-8d59b37b02eb36f37bcefafb952519d7dca744ad.tar.bz2
spark-8d59b37b02eb36f37bcefafb952519d7dca744ad.zip
[SPARK-3795] Heuristics for dynamically scaling executors
This is part of a bigger effort to provide elastic scaling of executors within a Spark application ([SPARK-3174](https://issues.apache.org/jira/browse/SPARK-3174)). This PR does not provide any functionality by itself; it is a skeleton that is missing a mechanism to be added later in [SPARK-3822](https://issues.apache.org/jira/browse/SPARK-3822). Comments and feedback are most welcome. For those of you reviewing this in detail, I highly recommend doing it through your favorite IDE instead of through the diff here. Author: Andrew Or <andrewor14@gmail.com> Author: Andrew Or <andrew@databricks.com> Closes #2746 from andrewor14/scaling-heuristics and squashes the following commits: 8a4fdaa [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics e045df8 [Andrew Or] Add warning message (minor) dfa31ec [Andrew Or] Fix tests c0becc4 [Andrew Or] Merging with SPARK-3822 4784f93 [Andrew Or] Reword an awkward log message 181f27f [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics c79e907 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 4672b90 [Andrew Or] It's nano time. a6a30f2 [Andrew Or] Do not allow min/max executors of 0 c60ec33 [Andrew Or] Rewrite test logic with clocks b00b680 [Andrew Or] Fix style c3caa65 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 7f9da14 [Andrew Or] Factor out logic to verify bounds on # executors (minor) f279019 [Andrew Or] Add time mocking tests for polling loop 685e347 [Andrew Or] Factor out clock in polling loop to facilitate testing 3cea7f7 [Andrew Or] Use PrivateMethodTester to keep original class private 3156d81 [Andrew Or] Update comments and exception messages 92f36f9 [Andrew Or] Address minor review comments abdea61 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 2aefd09 [Andrew Or] Correct listener behavior 9fe6e44 [Andrew Or] Rename variables and configs + update comments and log messages 149cc32 [Andrew Or] Fix style 254c958 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 5ff829b [Andrew Or] Add tests for ExecutorAllocationManager 19c6c4b [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 5896515 [Andrew Or] Move ExecutorAllocationManager out of scheduler package 9ca8945 [Andrew Or] Rewrite callbacks through the listener interface 5e336b9 [Andrew Or] Remove code from backend to avoid conflict with SPARK-3822 092d1fd [Andrew Or] Remove timeout logic for pending requests 1309fab [Andrew Or] Request executors by specifying the number pending 8bc0e9d [Andrew Or] Add logic to expire pending requests after timeouts b750ee1 [Andrew Or] Express timers in terms of expiration times + remove retry logic 7f8dd47 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 9d516cc [Andrew Or] Bug fix: Actually trigger the add timer / add retry timer 44f1832 [Andrew Or] Rename configs to include time units eaae7ef [Andrew Or] Address various review comments 6f8be6c [Andrew Or] Beef up comments on what each of the timers mean baaa403 [Andrew Or] Simplify variable names (minor) 42beec8 [Andrew Or] Reset whether the add threshold is crossed on cancellation 9bcc0bc [Andrew Or] ExecutorScalingManager -> ExecutorAllocationManager 2784398 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 5a97d9e [Andrew Or] Log retry attempts in INFO + clean up logging 2f55c9f [Andrew Or] Do not keep requesting executors even after max attempts 0acd1cb [Andrew Or] Rewrite timer logic with polling b3c7d44 [Andrew Or] Start the retry timer for adding executors at the right time 9b5f2ea [Andrew Or] Wording changes in comments and log messages c2203a5 [Andrew Or] Simplify code to access the scheduler backend e519d08 [Andrew Or] Simplify initialization code 2cc87a7 [Andrew Or] Add retry logic for removing executors d0b34a6 [Andrew Or] Add retry logic for adding executors 9cc4649 [Andrew Or] Simplifying synchronization logic 67c03c7 [Andrew Or] Correct semantics of adding executors + update comments 6c48ab0 [Andrew Or] Update synchronization comment 8901900 [Andrew Or] Simplify remove policy + change the semantics of add policy 1cc8444 [Andrew Or] Minor wording change ae5b64a [Andrew Or] Add synchronization 20ec6b9 [Andrew Or] First cut implementation of removing executors dynamically 4077ae2 [Andrew Or] Minor code re-organization 6f1fa66 [Andrew Or] First cut implementation of adding executors dynamically b2e6dcc [Andrew Or] Add skeleton interface for requesting / killing executors
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala462
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala662
3 files changed, 1149 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
new file mode 100644
index 0000000000..b2cf022baf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.mutable
+
+import org.apache.spark.scheduler._
+
+/**
+ * An agent that dynamically allocates and removes executors based on the workload.
+ *
+ * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
+ * the scheduler queue is not drained in N seconds, then new executors are added. If the queue
+ * persists for another M seconds, then more executors are added and so on. The number added
+ * in each round increases exponentially from the previous round until an upper bound on the
+ * number of executors has been reached.
+ *
+ * The rationale for the exponential increase is twofold: (1) Executors should be added slowly
+ * in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
+ * we may add more executors than we need just to remove them later. (2) Executors should be added
+ * quickly over time in case the maximum number of executors is very high. Otherwise, it will take
+ * a long time to ramp up under heavy workloads.
+ *
+ * The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not
+ * been scheduled to run any tasks, then it is removed.
+ *
+ * There is no retry logic in either case because we make the assumption that the cluster manager
+ * will eventually fulfill all requests it receives asynchronously.
+ *
+ * The relevant Spark properties include the following:
+ *
+ * spark.dynamicAllocation.enabled - Whether this feature is enabled
+ * spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
+ * spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
+ *
+ * spark.dynamicAllocation.schedulerBacklogTimeout (M) -
+ * If there are backlogged tasks for this duration, add new executors
+ *
+ * spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) -
+ * If the backlog is sustained for this duration, add more executors
+ * This is used only after the initial backlog timeout is exceeded
+ *
+ * spark.dynamicAllocation.executorIdleTimeout (K) -
+ * If an executor has been idle for this duration, remove it
+ */
+private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging {
+ import ExecutorAllocationManager._
+
+ private val conf = sc.conf
+
+ // Lower and upper bounds on the number of executors. These are required.
+ private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
+ private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
+ verifyBounds()
+
+ // How long there must be backlogged tasks for before an addition is triggered
+ private val schedulerBacklogTimeout = conf.getLong(
+ "spark.dynamicAllocation.schedulerBacklogTimeout", 60)
+
+ // Same as above, but used only after `schedulerBacklogTimeout` is exceeded
+ private val sustainedSchedulerBacklogTimeout = conf.getLong(
+ "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
+
+ // How long an executor must be idle for before it is removed
+ private val removeThresholdSeconds = conf.getLong(
+ "spark.dynamicAllocation.executorIdleTimeout", 600)
+
+ // Number of executors to add in the next round
+ private var numExecutorsToAdd = 1
+
+ // Number of executors that have been requested but have not registered yet
+ private var numExecutorsPending = 0
+
+ // Executors that have been requested to be removed but have not been killed yet
+ private val executorsPendingToRemove = new mutable.HashSet[String]
+
+ // All known executors
+ private val executorIds = new mutable.HashSet[String]
+
+ // A timestamp of when an addition should be triggered, or NOT_SET if it is not set
+ // This is set when pending tasks are added but not scheduled yet
+ private var addTime: Long = NOT_SET
+
+ // A timestamp for each executor of when the executor should be removed, indexed by the ID
+ // This is set when an executor is no longer running a task, or when it first registers
+ private val removeTimes = new mutable.HashMap[String, Long]
+
+ // Polling loop interval (ms)
+ private val intervalMillis: Long = 100
+
+ // Whether we are testing this class. This should only be used internally.
+ private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
+
+ // Clock used to schedule when executors should be added and removed
+ private var clock: Clock = new RealClock
+
+ /**
+ * Verify that the lower and upper bounds on the number of executors are valid.
+ * If not, throw an appropriate exception.
+ */
+ private def verifyBounds(): Unit = {
+ if (minNumExecutors < 0 || maxNumExecutors < 0) {
+ throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
+ }
+ if (minNumExecutors == 0 || maxNumExecutors == 0) {
+ throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
+ }
+ if (minNumExecutors > maxNumExecutors) {
+ throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
+ s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
+ }
+ }
+
+ /**
+ * Use a different clock for this allocation manager. This is mainly used for testing.
+ */
+ def setClock(newClock: Clock): Unit = {
+ clock = newClock
+ }
+
+ /**
+ * Register for scheduler callbacks to decide when to add and remove executors.
+ */
+ def start(): Unit = {
+ val listener = new ExecutorAllocationListener(this)
+ sc.addSparkListener(listener)
+ startPolling()
+ }
+
+ /**
+ * Start the main polling thread that keeps track of when to add and remove executors.
+ */
+ private def startPolling(): Unit = {
+ val t = new Thread {
+ override def run(): Unit = {
+ while (true) {
+ try {
+ schedule()
+ } catch {
+ case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
+ }
+ Thread.sleep(intervalMillis)
+ }
+ }
+ }
+ t.setName("spark-dynamic-executor-allocation")
+ t.setDaemon(true)
+ t.start()
+ }
+
+ /**
+ * If the add time has expired, request new executors and refresh the add time.
+ * If the remove time for an existing executor has expired, kill the executor.
+ * This is factored out into its own method for testing.
+ */
+ private def schedule(): Unit = synchronized {
+ val now = clock.getTimeMillis
+ if (addTime != NOT_SET && now >= addTime) {
+ addExecutors()
+ logDebug(s"Starting timer to add more executors (to " +
+ s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+ addTime += sustainedSchedulerBacklogTimeout * 1000
+ }
+
+ removeTimes.foreach { case (executorId, expireTime) =>
+ if (now >= expireTime) {
+ removeExecutor(executorId)
+ removeTimes.remove(executorId)
+ }
+ }
+ }
+
+ /**
+ * Request a number of executors from the cluster manager.
+ * If the cap on the number of executors is reached, give up and reset the
+ * number of executors to add next round instead of continuing to double it.
+ * Return the number actually requested.
+ */
+ private def addExecutors(): Int = synchronized {
+ // Do not request more executors if we have already reached the upper bound
+ val numExistingExecutors = executorIds.size + numExecutorsPending
+ if (numExistingExecutors >= maxNumExecutors) {
+ logDebug(s"Not adding executors because there are already ${executorIds.size} " +
+ s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
+ numExecutorsToAdd = 1
+ return 0
+ }
+
+ // Request executors with respect to the upper bound
+ val actualNumExecutorsToAdd =
+ if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) {
+ numExecutorsToAdd
+ } else {
+ maxNumExecutors - numExistingExecutors
+ }
+ val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
+ val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
+ if (addRequestAcknowledged) {
+ logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
+ s"tasks are backlogged (new desired total will be $newTotalExecutors)")
+ numExecutorsToAdd =
+ if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1
+ numExecutorsPending += actualNumExecutorsToAdd
+ actualNumExecutorsToAdd
+ } else {
+ logWarning(s"Unable to reach the cluster manager " +
+ s"to request $actualNumExecutorsToAdd executors!")
+ 0
+ }
+ }
+
+ /**
+ * Request the cluster manager to remove the given executor.
+ * Return whether the request is received.
+ */
+ private def removeExecutor(executorId: String): Boolean = synchronized {
+ // Do not kill the executor if we are not aware of it (should never happen)
+ if (!executorIds.contains(executorId)) {
+ logWarning(s"Attempted to remove unknown executor $executorId!")
+ return false
+ }
+
+ // Do not kill the executor again if it is already pending to be killed (should never happen)
+ if (executorsPendingToRemove.contains(executorId)) {
+ logWarning(s"Attempted to remove executor $executorId " +
+ s"when it is already pending to be removed!")
+ return false
+ }
+
+ // Do not kill the executor if we have already reached the lower bound
+ val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
+ if (numExistingExecutors - 1 < minNumExecutors) {
+ logInfo(s"Not removing idle executor $executorId because there are only " +
+ s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
+ return false
+ }
+
+ // Send a request to the backend to kill this executor
+ val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
+ if (removeRequestAcknowledged) {
+ logInfo(s"Removing executor $executorId because it has been idle for " +
+ s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
+ executorsPendingToRemove.add(executorId)
+ true
+ } else {
+ logWarning(s"Unable to reach the cluster manager to kill executor $executorId!")
+ false
+ }
+ }
+
+ /**
+ * Callback invoked when the specified executor has been added.
+ */
+ private def onExecutorAdded(executorId: String): Unit = synchronized {
+ if (!executorIds.contains(executorId)) {
+ executorIds.add(executorId)
+ executorIds.foreach(onExecutorIdle)
+ logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
+ if (numExecutorsPending > 0) {
+ numExecutorsPending -= 1
+ logDebug(s"Decremented number of pending executors ($numExecutorsPending left)")
+ }
+ } else {
+ logWarning(s"Duplicate executor $executorId has registered")
+ }
+ }
+
+ /**
+ * Callback invoked when the specified executor has been removed.
+ */
+ private def onExecutorRemoved(executorId: String): Unit = synchronized {
+ if (executorIds.contains(executorId)) {
+ executorIds.remove(executorId)
+ removeTimes.remove(executorId)
+ logInfo(s"Existing executor $executorId has been removed (new total is ${executorIds.size})")
+ if (executorsPendingToRemove.contains(executorId)) {
+ executorsPendingToRemove.remove(executorId)
+ logDebug(s"Executor $executorId is no longer pending to " +
+ s"be removed (${executorsPendingToRemove.size} left)")
+ }
+ } else {
+ logWarning(s"Unknown executor $executorId has been removed!")
+ }
+ }
+
+ /**
+ * Callback invoked when the scheduler receives new pending tasks.
+ * This sets a time in the future that decides when executors should be added
+ * if it is not already set.
+ */
+ private def onSchedulerBacklogged(): Unit = synchronized {
+ if (addTime == NOT_SET) {
+ logDebug(s"Starting timer to add executors because pending tasks " +
+ s"are building up (to expire in $schedulerBacklogTimeout seconds)")
+ addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
+ }
+ }
+
+ /**
+ * Callback invoked when the scheduler queue is drained.
+ * This resets all variables used for adding executors.
+ */
+ private def onSchedulerQueueEmpty(): Unit = synchronized {
+ logDebug(s"Clearing timer to add executors because there are no more pending tasks")
+ addTime = NOT_SET
+ numExecutorsToAdd = 1
+ }
+
+ /**
+ * Callback invoked when the specified executor is no longer running any tasks.
+ * This sets a time in the future that decides when this executor should be removed if
+ * the executor is not already marked as idle.
+ */
+ private def onExecutorIdle(executorId: String): Unit = synchronized {
+ if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
+ logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
+ s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
+ removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
+ }
+ }
+
+ /**
+ * Callback invoked when the specified executor is now running a task.
+ * This resets all variables used for removing this executor.
+ */
+ private def onExecutorBusy(executorId: String): Unit = synchronized {
+ logDebug(s"Clearing idle timer for $executorId because it is now running a task")
+ removeTimes.remove(executorId)
+ }
+
+ /**
+ * A listener that notifies the given allocation manager of when to add and remove executors.
+ *
+ * This class is intentionally conservative in its assumptions about the relative ordering
+ * and consistency of events returned by the listener. For simplicity, it does not account
+ * for speculated tasks.
+ */
+ private class ExecutorAllocationListener(allocationManager: ExecutorAllocationManager)
+ extends SparkListener {
+
+ private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
+ private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
+ private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
+
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
+ synchronized {
+ val stageId = stageSubmitted.stageInfo.stageId
+ val numTasks = stageSubmitted.stageInfo.numTasks
+ stageIdToNumTasks(stageId) = numTasks
+ allocationManager.onSchedulerBacklogged()
+ }
+ }
+
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
+ synchronized {
+ val stageId = stageCompleted.stageInfo.stageId
+ stageIdToNumTasks -= stageId
+ stageIdToTaskIndices -= stageId
+
+ // If this is the last stage with pending tasks, mark the scheduler queue as empty
+ // This is needed in case the stage is aborted for any reason
+ if (stageIdToNumTasks.isEmpty) {
+ allocationManager.onSchedulerQueueEmpty()
+ }
+ }
+ }
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
+ val stageId = taskStart.stageId
+ val taskId = taskStart.taskInfo.taskId
+ val taskIndex = taskStart.taskInfo.index
+ val executorId = taskStart.taskInfo.executorId
+
+ // If this is the last pending task, mark the scheduler queue as empty
+ stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
+ val numTasksScheduled = stageIdToTaskIndices(stageId).size
+ val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
+ if (numTasksScheduled == numTasksTotal) {
+ // No more pending tasks for this stage
+ stageIdToNumTasks -= stageId
+ if (stageIdToNumTasks.isEmpty) {
+ allocationManager.onSchedulerQueueEmpty()
+ }
+ }
+
+ // Mark the executor on which this task is scheduled as busy
+ executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
+ allocationManager.onExecutorBusy(executorId)
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+ val executorId = taskEnd.taskInfo.executorId
+ val taskId = taskEnd.taskInfo.taskId
+
+ // If the executor is no longer running scheduled any tasks, mark it as idle
+ if (executorIdToTaskIds.contains(executorId)) {
+ executorIdToTaskIds(executorId) -= taskId
+ if (executorIdToTaskIds(executorId).isEmpty) {
+ executorIdToTaskIds -= executorId
+ allocationManager.onExecutorIdle(executorId)
+ }
+ }
+ }
+
+ override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
+ val executorId = blockManagerAdded.blockManagerId.executorId
+ if (executorId != "<driver>") {
+ allocationManager.onExecutorAdded(executorId)
+ }
+ }
+
+ override def onBlockManagerRemoved(
+ blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
+ allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
+ }
+ }
+
+}
+
+private object ExecutorAllocationManager {
+ val NOT_SET = Long.MaxValue
+}
+
+/**
+ * An abstract clock for measuring elapsed time.
+ */
+private trait Clock {
+ def getTimeMillis: Long
+}
+
+/**
+ * A clock backed by a monotonically increasing time source.
+ * The time returned by this clock does not correspond to any notion of wall-clock time.
+ */
+private class RealClock extends Clock {
+ override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
+}
+
+/**
+ * A clock that allows the caller to customize the time.
+ * This is used mainly for testing.
+ */
+private class TestClock(startTimeMillis: Long) extends Clock {
+ private var time: Long = startTimeMillis
+ override def getTimeMillis: Long = time
+ def tick(ms: Long): Unit = { time += ms }
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 40ea369f9e..73668e83bb 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -330,6 +330,15 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
} else None
}
+ // Optionally scale number of executors dynamically based on workload. Exposed for testing.
+ private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
+ if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+ Some(new ExecutorAllocationManager(this))
+ } else {
+ None
+ }
+ executorAllocationManager.foreach(_.start())
+
// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()
@@ -860,36 +869,42 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
/**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
- * This is currently only supported in Yarn mode.
+ * This is currently only supported in Yarn mode. Return whether the request is received.
*/
@DeveloperApi
- def requestExecutors(numAdditionalExecutors: Int): Unit = {
+ def requestExecutors(numAdditionalExecutors: Int): Boolean = {
schedulerBackend match {
- case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors)
- case _ => logWarning("Requesting executors is only supported in coarse-grained mode")
+ case b: CoarseGrainedSchedulerBackend =>
+ b.requestExecutors(numAdditionalExecutors)
+ case _ =>
+ logWarning("Requesting executors is only supported in coarse-grained mode")
+ false
}
}
/**
* :: DeveloperApi ::
* Request that the cluster manager kill the specified executors.
- * This is currently only supported in Yarn mode.
+ * This is currently only supported in Yarn mode. Return whether the request is received.
*/
@DeveloperApi
- def killExecutors(executorIds: Seq[String]): Unit = {
+ def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
- case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds)
- case _ => logWarning("Killing executors is only supported in coarse-grained mode")
+ case b: CoarseGrainedSchedulerBackend =>
+ b.killExecutors(executorIds)
+ case _ =>
+ logWarning("Killing executors is only supported in coarse-grained mode")
+ false
}
}
/**
* :: DeveloperApi ::
* Request that cluster manager the kill the specified executor.
- * This is currently only supported in Yarn mode.
+ * This is currently only supported in Yarn mode. Return whether the request is received.
*/
@DeveloperApi
- def killExecutor(executorId: String): Unit = killExecutors(Seq(executorId))
+ def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
/** The version of Spark on which this application is running. */
def version = SPARK_VERSION
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
new file mode 100644
index 0000000000..f0aa914cfe
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.{FunSuite, PrivateMethodTester}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.BlockManagerId
+
+/**
+ * Test add and remove behavior of ExecutorAllocationManager.
+ */
+class ExecutorAllocationManagerSuite extends FunSuite {
+ import ExecutorAllocationManager._
+ import ExecutorAllocationManagerSuite._
+
+ test("verify min/max executors") {
+ // No min or max
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test-executor-allocation-manager")
+ .set("spark.dynamicAllocation.enabled", "true")
+ intercept[SparkException] { new SparkContext(conf) }
+
+ // Only min
+ val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
+ intercept[SparkException] { new SparkContext(conf1) }
+
+ // Only max
+ val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
+ intercept[SparkException] { new SparkContext(conf2) }
+
+ // Both min and max, but min > max
+ intercept[SparkException] { createSparkContext(2, 1) }
+
+ // Both min and max, and min == max
+ val sc1 = createSparkContext(1, 1)
+ assert(sc1.executorAllocationManager.isDefined)
+ sc1.stop()
+
+ // Both min and max, and min < max
+ val sc2 = createSparkContext(1, 2)
+ assert(sc2.executorAllocationManager.isDefined)
+ sc2.stop()
+ }
+
+ test("starting state") {
+ val sc = createSparkContext()
+ val manager = sc.executorAllocationManager.get
+ assert(numExecutorsPending(manager) === 0)
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(executorIds(manager).isEmpty)
+ assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
+ assert(removeTimes(manager).isEmpty)
+ sc.stop()
+ }
+
+ test("add executors") {
+ val sc = createSparkContext(1, 10)
+ val manager = sc.executorAllocationManager.get
+
+ // Keep adding until the limit is reached
+ assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 2)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsPending(manager) === 3)
+ assert(numExecutorsToAdd(manager) === 4)
+ assert(addExecutors(manager) === 4)
+ assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsToAdd(manager) === 8)
+ assert(addExecutors(manager) === 3) // reached the limit of 10
+ assert(numExecutorsPending(manager) === 10)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(addExecutors(manager) === 0)
+ assert(numExecutorsPending(manager) === 10)
+ assert(numExecutorsToAdd(manager) === 1)
+
+ // Register previously requested executors
+ onExecutorAdded(manager, "first")
+ assert(numExecutorsPending(manager) === 9)
+ onExecutorAdded(manager, "second")
+ onExecutorAdded(manager, "third")
+ onExecutorAdded(manager, "fourth")
+ assert(numExecutorsPending(manager) === 6)
+ onExecutorAdded(manager, "first") // duplicates should not count
+ onExecutorAdded(manager, "second")
+ assert(numExecutorsPending(manager) === 6)
+
+ // Try adding again
+ // This should still fail because the number pending + running is still at the limit
+ assert(addExecutors(manager) === 0)
+ assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(addExecutors(manager) === 0)
+ assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsToAdd(manager) === 1)
+ sc.stop()
+ }
+
+ test("remove executors") {
+ val sc = createSparkContext(5, 10)
+ val manager = sc.executorAllocationManager.get
+ (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
+
+ // Keep removing until the limit is reached
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(removeExecutor(manager, "1"))
+ assert(executorsPendingToRemove(manager).size === 1)
+ assert(executorsPendingToRemove(manager).contains("1"))
+ assert(removeExecutor(manager, "2"))
+ assert(removeExecutor(manager, "3"))
+ assert(executorsPendingToRemove(manager).size === 3)
+ assert(executorsPendingToRemove(manager).contains("2"))
+ assert(executorsPendingToRemove(manager).contains("3"))
+ assert(!removeExecutor(manager, "100")) // remove non-existent executors
+ assert(!removeExecutor(manager, "101"))
+ assert(executorsPendingToRemove(manager).size === 3)
+ assert(removeExecutor(manager, "4"))
+ assert(removeExecutor(manager, "5"))
+ assert(!removeExecutor(manager, "6")) // reached the limit of 5
+ assert(executorsPendingToRemove(manager).size === 5)
+ assert(executorsPendingToRemove(manager).contains("4"))
+ assert(executorsPendingToRemove(manager).contains("5"))
+ assert(!executorsPendingToRemove(manager).contains("6"))
+
+ // Kill executors previously requested to remove
+ onExecutorRemoved(manager, "1")
+ assert(executorsPendingToRemove(manager).size === 4)
+ assert(!executorsPendingToRemove(manager).contains("1"))
+ onExecutorRemoved(manager, "2")
+ onExecutorRemoved(manager, "3")
+ assert(executorsPendingToRemove(manager).size === 2)
+ assert(!executorsPendingToRemove(manager).contains("2"))
+ assert(!executorsPendingToRemove(manager).contains("3"))
+ onExecutorRemoved(manager, "2") // duplicates should not count
+ onExecutorRemoved(manager, "3")
+ assert(executorsPendingToRemove(manager).size === 2)
+ onExecutorRemoved(manager, "4")
+ onExecutorRemoved(manager, "5")
+ assert(executorsPendingToRemove(manager).isEmpty)
+
+ // Try removing again
+ // This should still fail because the number pending + running is still at the limit
+ assert(!removeExecutor(manager, "7"))
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(!removeExecutor(manager, "8"))
+ assert(executorsPendingToRemove(manager).isEmpty)
+ sc.stop()
+ }
+
+ test ("interleaving add and remove") {
+ val sc = createSparkContext(5, 10)
+ val manager = sc.executorAllocationManager.get
+
+ // Add a few executors
+ assert(addExecutors(manager) === 1)
+ assert(addExecutors(manager) === 2)
+ assert(addExecutors(manager) === 4)
+ onExecutorAdded(manager, "1")
+ onExecutorAdded(manager, "2")
+ onExecutorAdded(manager, "3")
+ onExecutorAdded(manager, "4")
+ onExecutorAdded(manager, "5")
+ onExecutorAdded(manager, "6")
+ onExecutorAdded(manager, "7")
+ assert(executorIds(manager).size === 7)
+
+ // Remove until limit
+ assert(removeExecutor(manager, "1"))
+ assert(removeExecutor(manager, "2"))
+ assert(!removeExecutor(manager, "3")) // lower limit reached
+ assert(!removeExecutor(manager, "4"))
+ onExecutorRemoved(manager, "1")
+ onExecutorRemoved(manager, "2")
+ assert(executorIds(manager).size === 5)
+
+ // Add until limit
+ assert(addExecutors(manager) === 5) // upper limit reached
+ assert(addExecutors(manager) === 0)
+ assert(!removeExecutor(manager, "3")) // still at lower limit
+ assert(!removeExecutor(manager, "4"))
+ onExecutorAdded(manager, "8")
+ onExecutorAdded(manager, "9")
+ onExecutorAdded(manager, "10")
+ onExecutorAdded(manager, "11")
+ onExecutorAdded(manager, "12")
+ assert(executorIds(manager).size === 10)
+
+ // Remove succeeds again, now that we are no longer at the lower limit
+ assert(removeExecutor(manager, "3"))
+ assert(removeExecutor(manager, "4"))
+ assert(removeExecutor(manager, "5"))
+ assert(removeExecutor(manager, "6"))
+ assert(executorIds(manager).size === 10)
+ assert(addExecutors(manager) === 0) // still at upper limit
+ onExecutorRemoved(manager, "3")
+ onExecutorRemoved(manager, "4")
+ assert(executorIds(manager).size === 8)
+
+ // Add succeeds again, now that we are no longer at the upper limit
+ // Number of executors added restarts at 1
+ assert(addExecutors(manager) === 1)
+ assert(addExecutors(manager) === 1) // upper limit reached again
+ assert(addExecutors(manager) === 0)
+ assert(executorIds(manager).size === 8)
+ onExecutorRemoved(manager, "5")
+ onExecutorRemoved(manager, "6")
+ onExecutorAdded(manager, "13")
+ onExecutorAdded(manager, "14")
+ assert(executorIds(manager).size === 8)
+ assert(addExecutors(manager) === 1)
+ assert(addExecutors(manager) === 1) // upper limit reached again
+ assert(addExecutors(manager) === 0)
+ onExecutorAdded(manager, "15")
+ onExecutorAdded(manager, "16")
+ assert(executorIds(manager).size === 10)
+ sc.stop()
+ }
+
+ test("starting/canceling add timer") {
+ val sc = createSparkContext(2, 10)
+ val clock = new TestClock(8888L)
+ val manager = sc.executorAllocationManager.get
+ manager.setClock(clock)
+
+ // Starting add timer is idempotent
+ assert(addTime(manager) === NOT_SET)
+ onSchedulerBacklogged(manager)
+ val firstAddTime = addTime(manager)
+ assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
+ clock.tick(100L)
+ onSchedulerBacklogged(manager)
+ assert(addTime(manager) === firstAddTime) // timer is already started
+ clock.tick(200L)
+ onSchedulerBacklogged(manager)
+ assert(addTime(manager) === firstAddTime)
+ onSchedulerQueueEmpty(manager)
+
+ // Restart add timer
+ clock.tick(1000L)
+ assert(addTime(manager) === NOT_SET)
+ onSchedulerBacklogged(manager)
+ val secondAddTime = addTime(manager)
+ assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
+ clock.tick(100L)
+ onSchedulerBacklogged(manager)
+ assert(addTime(manager) === secondAddTime) // timer is already started
+ assert(addTime(manager) !== firstAddTime)
+ assert(firstAddTime !== secondAddTime)
+ }
+
+ test("starting/canceling remove timers") {
+ val sc = createSparkContext(2, 10)
+ val clock = new TestClock(14444L)
+ val manager = sc.executorAllocationManager.get
+ manager.setClock(clock)
+
+ // Starting remove timer is idempotent for each executor
+ assert(removeTimes(manager).isEmpty)
+ onExecutorIdle(manager, "1")
+ assert(removeTimes(manager).size === 1)
+ assert(removeTimes(manager).contains("1"))
+ val firstRemoveTime = removeTimes(manager)("1")
+ assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
+ clock.tick(100L)
+ onExecutorIdle(manager, "1")
+ assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started
+ clock.tick(200L)
+ onExecutorIdle(manager, "1")
+ assert(removeTimes(manager)("1") === firstRemoveTime)
+ clock.tick(300L)
+ onExecutorIdle(manager, "2")
+ assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
+ assert(removeTimes(manager)("2") === clock.getTimeMillis + executorIdleTimeout * 1000)
+ clock.tick(400L)
+ onExecutorIdle(manager, "3")
+ assert(removeTimes(manager)("3") !== firstRemoveTime)
+ assert(removeTimes(manager)("3") === clock.getTimeMillis + executorIdleTimeout * 1000)
+ assert(removeTimes(manager).size === 3)
+ assert(removeTimes(manager).contains("2"))
+ assert(removeTimes(manager).contains("3"))
+
+ // Restart remove timer
+ clock.tick(1000L)
+ onExecutorBusy(manager, "1")
+ assert(removeTimes(manager).size === 2)
+ onExecutorIdle(manager, "1")
+ assert(removeTimes(manager).size === 3)
+ assert(removeTimes(manager).contains("1"))
+ val secondRemoveTime = removeTimes(manager)("1")
+ assert(secondRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
+ assert(removeTimes(manager)("1") === secondRemoveTime) // timer is already started
+ assert(removeTimes(manager)("1") !== firstRemoveTime)
+ assert(firstRemoveTime !== secondRemoveTime)
+ }
+
+ test("mock polling loop with no events") {
+ val sc = createSparkContext(1, 20)
+ val manager = sc.executorAllocationManager.get
+ val clock = new TestClock(2020L)
+ manager.setClock(clock)
+
+ // No events - we should not be adding or removing
+ assert(numExecutorsPending(manager) === 0)
+ assert(executorsPendingToRemove(manager).isEmpty)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 0)
+ assert(executorsPendingToRemove(manager).isEmpty)
+ clock.tick(100L)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 0)
+ assert(executorsPendingToRemove(manager).isEmpty)
+ clock.tick(1000L)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 0)
+ assert(executorsPendingToRemove(manager).isEmpty)
+ clock.tick(10000L)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 0)
+ assert(executorsPendingToRemove(manager).isEmpty)
+ }
+
+ test("mock polling loop add behavior") {
+ val sc = createSparkContext(1, 20)
+ val clock = new TestClock(2020L)
+ val manager = sc.executorAllocationManager.get
+ manager.setClock(clock)
+
+ // Scheduler queue backlogged
+ onSchedulerBacklogged(manager)
+ clock.tick(schedulerBacklogTimeout * 1000 / 2)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
+ clock.tick(schedulerBacklogTimeout * 1000)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 1) // first timer exceeded
+ clock.tick(sustainedSchedulerBacklogTimeout * 1000 / 2)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
+ clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
+ clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
+
+ // Scheduler queue drained
+ onSchedulerQueueEmpty(manager)
+ clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 7) // timer is canceled
+ clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 7)
+
+ // Scheduler queue backlogged again
+ onSchedulerBacklogged(manager)
+ clock.tick(schedulerBacklogTimeout * 1000)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
+ clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 7 + 1 + 2)
+ clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
+ clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+ schedule(manager)
+ assert(numExecutorsPending(manager) === 20) // limit reached
+ }
+
+ test("mock polling loop remove behavior") {
+ val sc = createSparkContext(1, 20)
+ val clock = new TestClock(2020L)
+ val manager = sc.executorAllocationManager.get
+ manager.setClock(clock)
+
+ // Remove idle executors on timeout
+ onExecutorAdded(manager, "executor-1")
+ onExecutorAdded(manager, "executor-2")
+ onExecutorAdded(manager, "executor-3")
+ assert(removeTimes(manager).size === 3)
+ assert(executorsPendingToRemove(manager).isEmpty)
+ clock.tick(executorIdleTimeout * 1000 / 2)
+ schedule(manager)
+ assert(removeTimes(manager).size === 3) // idle threshold not reached yet
+ assert(executorsPendingToRemove(manager).isEmpty)
+ clock.tick(executorIdleTimeout * 1000)
+ schedule(manager)
+ assert(removeTimes(manager).isEmpty) // idle threshold exceeded
+ assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 executor remaining)
+
+ // Mark a subset as busy - only idle executors should be removed
+ onExecutorAdded(manager, "executor-4")
+ onExecutorAdded(manager, "executor-5")
+ onExecutorAdded(manager, "executor-6")
+ onExecutorAdded(manager, "executor-7")
+ assert(removeTimes(manager).size === 5) // 5 active executors
+ assert(executorsPendingToRemove(manager).size === 2) // 2 pending to be removed
+ onExecutorBusy(manager, "executor-4")
+ onExecutorBusy(manager, "executor-5")
+ onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 active ones)
+ schedule(manager)
+ assert(removeTimes(manager).size === 2) // remove only idle executors
+ assert(!removeTimes(manager).contains("executor-4"))
+ assert(!removeTimes(manager).contains("executor-5"))
+ assert(!removeTimes(manager).contains("executor-6"))
+ assert(executorsPendingToRemove(manager).size === 2)
+ clock.tick(executorIdleTimeout * 1000)
+ schedule(manager)
+ assert(removeTimes(manager).isEmpty) // idle executors are removed
+ assert(executorsPendingToRemove(manager).size === 4)
+ assert(!executorsPendingToRemove(manager).contains("executor-4"))
+ assert(!executorsPendingToRemove(manager).contains("executor-5"))
+ assert(!executorsPendingToRemove(manager).contains("executor-6"))
+
+ // Busy executors are now idle and should be removed
+ onExecutorIdle(manager, "executor-4")
+ onExecutorIdle(manager, "executor-5")
+ onExecutorIdle(manager, "executor-6")
+ schedule(manager)
+ assert(removeTimes(manager).size === 3) // 0 busy and 3 idle
+ assert(removeTimes(manager).contains("executor-4"))
+ assert(removeTimes(manager).contains("executor-5"))
+ assert(removeTimes(manager).contains("executor-6"))
+ assert(executorsPendingToRemove(manager).size === 4)
+ clock.tick(executorIdleTimeout * 1000)
+ schedule(manager)
+ assert(removeTimes(manager).isEmpty)
+ assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining)
+ }
+
+ test("listeners trigger add executors correctly") {
+ val sc = createSparkContext(2, 10)
+ val manager = sc.executorAllocationManager.get
+ assert(addTime(manager) === NOT_SET)
+
+ // Starting a stage should start the add timer
+ val numTasks = 10
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks)))
+ assert(addTime(manager) !== NOT_SET)
+
+ // Starting a subset of the tasks should not cancel the add timer
+ val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, "executor-1") }
+ taskInfos.tail.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
+ assert(addTime(manager) !== NOT_SET)
+
+ // Starting all remaining tasks should cancel the add timer
+ sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfos.head))
+ assert(addTime(manager) === NOT_SET)
+
+ // Start two different stages
+ // The add timer should be canceled only if all tasks in both stages start running
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks)))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks)))
+ assert(addTime(manager) !== NOT_SET)
+ taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) }
+ assert(addTime(manager) !== NOT_SET)
+ taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, info)) }
+ assert(addTime(manager) === NOT_SET)
+ }
+
+ test("listeners trigger remove executors correctly") {
+ val sc = createSparkContext(2, 10)
+ val manager = sc.executorAllocationManager.get
+ assert(removeTimes(manager).isEmpty)
+
+ // Added executors should start the remove timers for each executor
+ (1 to 5).map("executor-" + _).foreach { id => onExecutorAdded(manager, id) }
+ assert(removeTimes(manager).size === 5)
+
+ // Starting a task cancel the remove timer for that executor
+ sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
+ sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1")))
+ sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2")))
+ assert(removeTimes(manager).size === 3)
+ assert(!removeTimes(manager).contains("executor-1"))
+ assert(!removeTimes(manager).contains("executor-2"))
+
+ // Finishing all tasks running on an executor should start the remove timer for that executor
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(
+ 0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new TaskMetrics))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(
+ 0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new TaskMetrics))
+ assert(removeTimes(manager).size === 4)
+ assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not finished yet
+ assert(removeTimes(manager).contains("executor-2"))
+ sc.listenerBus.postToAll(SparkListenerTaskEnd(
+ 0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new TaskMetrics))
+ assert(removeTimes(manager).size === 5)
+ assert(removeTimes(manager).contains("executor-1")) // executor-1 has now finished
+ }
+
+ test("listeners trigger add and remove executor callbacks correctly") {
+ val sc = createSparkContext(2, 10)
+ val manager = sc.executorAllocationManager.get
+ assert(executorIds(manager).isEmpty)
+ assert(removeTimes(manager).isEmpty)
+
+ // New executors have registered
+ sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+ 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ assert(executorIds(manager).size === 1)
+ assert(executorIds(manager).contains("executor-1"))
+ assert(removeTimes(manager).size === 1)
+ assert(removeTimes(manager).contains("executor-1"))
+ sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+ 0L, BlockManagerId("executor-2", "host2", 1), 100L))
+ assert(executorIds(manager).size === 2)
+ assert(executorIds(manager).contains("executor-2"))
+ assert(removeTimes(manager).size === 2)
+ assert(removeTimes(manager).contains("executor-2"))
+
+ // Existing executors have disconnected
+ sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
+ 0L, BlockManagerId("executor-1", "host1", 1)))
+ assert(executorIds(manager).size === 1)
+ assert(!executorIds(manager).contains("executor-1"))
+ assert(removeTimes(manager).size === 1)
+ assert(!removeTimes(manager).contains("executor-1"))
+
+ // Unknown executor has disconnected
+ sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
+ 0L, BlockManagerId("executor-3", "host3", 1)))
+ assert(executorIds(manager).size === 1)
+ assert(removeTimes(manager).size === 1)
+ }
+
+}
+
+/**
+ * Helper methods for testing ExecutorAllocationManager.
+ * This includes methods to access private methods and fields in ExecutorAllocationManager.
+ */
+private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
+ private val schedulerBacklogTimeout = 1L
+ private val sustainedSchedulerBacklogTimeout = 2L
+ private val executorIdleTimeout = 3L
+
+ private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test-executor-allocation-manager")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
+ .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
+ .set("spark.dynamicAllocation.schedulerBacklogTimeout", schedulerBacklogTimeout.toString)
+ .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
+ sustainedSchedulerBacklogTimeout.toString)
+ .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
+ .set("spark.dynamicAllocation.testing", "true")
+ new SparkContext(conf)
+ }
+
+ private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
+ new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
+ }
+
+ private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
+ new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false)
+ }
+
+ /* ------------------------------------------------------- *
+ | Helper methods for accessing private methods and fields |
+ * ------------------------------------------------------- */
+
+ private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
+ private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
+ private val _executorsPendingToRemove =
+ PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
+ private val _executorIds = PrivateMethod[collection.Set[String]]('executorIds)
+ private val _addTime = PrivateMethod[Long]('addTime)
+ private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
+ private val _schedule = PrivateMethod[Unit]('schedule)
+ private val _addExecutors = PrivateMethod[Int]('addExecutors)
+ private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
+ private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
+ private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
+ private val _onSchedulerBacklogged = PrivateMethod[Unit]('onSchedulerBacklogged)
+ private val _onSchedulerQueueEmpty = PrivateMethod[Unit]('onSchedulerQueueEmpty)
+ private val _onExecutorIdle = PrivateMethod[Unit]('onExecutorIdle)
+ private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
+
+ private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _numExecutorsToAdd()
+ }
+
+ private def numExecutorsPending(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _numExecutorsPending()
+ }
+
+ private def executorsPendingToRemove(
+ manager: ExecutorAllocationManager): collection.Set[String] = {
+ manager invokePrivate _executorsPendingToRemove()
+ }
+
+ private def executorIds(manager: ExecutorAllocationManager): collection.Set[String] = {
+ manager invokePrivate _executorIds()
+ }
+
+ private def addTime(manager: ExecutorAllocationManager): Long = {
+ manager invokePrivate _addTime()
+ }
+
+ private def removeTimes(manager: ExecutorAllocationManager): collection.Map[String, Long] = {
+ manager invokePrivate _removeTimes()
+ }
+
+ private def schedule(manager: ExecutorAllocationManager): Unit = {
+ manager invokePrivate _schedule()
+ }
+
+ private def addExecutors(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _addExecutors()
+ }
+
+ private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {
+ manager invokePrivate _removeExecutor(id)
+ }
+
+ private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = {
+ manager invokePrivate _onExecutorAdded(id)
+ }
+
+ private def onExecutorRemoved(manager: ExecutorAllocationManager, id: String): Unit = {
+ manager invokePrivate _onExecutorRemoved(id)
+ }
+
+ private def onSchedulerBacklogged(manager: ExecutorAllocationManager): Unit = {
+ manager invokePrivate _onSchedulerBacklogged()
+ }
+
+ private def onSchedulerQueueEmpty(manager: ExecutorAllocationManager): Unit = {
+ manager invokePrivate _onSchedulerQueueEmpty()
+ }
+
+ private def onExecutorIdle(manager: ExecutorAllocationManager, id: String): Unit = {
+ manager invokePrivate _onExecutorIdle(id)
+ }
+
+ private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): Unit = {
+ manager invokePrivate _onExecutorBusy(id)
+ }
+}