aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-05-01 18:32:46 -0700
committerAndrew Or <andrew@databricks.com>2015-05-01 18:33:15 -0700
commit099327d5376554134c9af49bc2045add4cfb024d (patch)
treeca92055b6720c1c13ba5a450e625e82f62e8345f /core/src/main/scala/org
parentae98eec730125c1153dcac9ea941959cc79e4f42 (diff)
downloadspark-099327d5376554134c9af49bc2045add4cfb024d.tar.gz
spark-099327d5376554134c9af49bc2045add4cfb024d.tar.bz2
spark-099327d5376554134c9af49bc2045add4cfb024d.zip
[SPARK-6954] [YARN] ExecutorAllocationManager can end up requesting a negative n...
...umber of executors Author: Sandy Ryza <sandy@cloudera.com> Closes #5704 from sryza/sandy-spark-6954 and squashes the following commits: b7890fb [Sandy Ryza] Avoid ramping up to an existing number of executors 6eb516a [Sandy Ryza] SPARK-6954. ExecutorAllocationManager can end up requesting a negative number of executors
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala101
1 files changed, 49 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b986fa87dc..228d9149df 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -27,13 +27,20 @@ import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
*
- * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
- * the scheduler queue is not drained in N seconds, then new executors are added. If the queue
- * persists for another M seconds, then more executors are added and so on. The number added
- * in each round increases exponentially from the previous round until an upper bound on the
- * number of executors has been reached. The upper bound is based both on a configured property
- * and on the number of tasks pending: the policy will never increase the number of executor
- * requests past the number needed to handle all pending tasks.
+ * The ExecutorAllocationManager maintains a moving target number of executors which is periodically
+ * synced to the cluster manager. The target starts at a configured initial value and changes with
+ * the number of pending and running tasks.
+ *
+ * Decreasing the target number of executors happens when the current target is more than needed to
+ * handle the current load. The target number of executors is always truncated to the number of
+ * executors that could run all current running and pending tasks at once.
+ *
+ * Increasing the target number of executors happens in response to backlogged tasks waiting to be
+ * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
+ * the queue persists for another M seconds, then more executors are added and so on. The number
+ * added in each round increases exponentially from the previous round until an upper bound has been
+ * reached. The upper bound is based both on a configured property and on the current number of
+ * running and pending tasks, as described above.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager(
// Number of executors to add in the next round
private var numExecutorsToAdd = 1
- // Number of executors that have been requested but have not registered yet
- private var numExecutorsPending = 0
+ // The desired number of executors at this moment in time. If all our executors were to die, this
+ // is the number of executors we would immediately want from the cluster manager.
+ private var numExecutorsTarget =
+ conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
@@ -200,13 +209,6 @@ private[spark] class ExecutorAllocationManager(
}
/**
- * The number of executors we would have if the cluster manager were to fulfill all our existing
- * requests.
- */
- private def targetNumExecutors(): Int =
- numExecutorsPending + executorIds.size - executorsPendingToRemove.size
-
- /**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
*/
@@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
- addOrCancelExecutorRequests(now)
+ updateAndSyncNumExecutorsTarget(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
@@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Updates our target number of executors and syncs the result with the cluster manager.
+ *
* Check to see whether our existing allocation and the requests we've made previously exceed our
- * current needs. If so, let the cluster manager know so that it can cancel pending requests that
- * are unneeded.
+ * current needs. If so, truncate our target and let the cluster manager know so that it can
+ * cancel pending requests that are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
* @return the delta in the target number of executors.
*/
- private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
- val currentTarget = targetNumExecutors
+ private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded
- if (maxNeeded < currentTarget) {
+ if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
- // executors and inform the cluster manager to cancel the extra pending requests.
- val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
- client.requestTotalExecutors(newTotalExecutors)
+ // executors and inform the cluster manager to cancel the extra pending requests
+ val oldNumExecutorsTarget = numExecutorsTarget
+ numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
+ client.requestTotalExecutors(numExecutorsTarget)
numExecutorsToAdd = 1
- updateNumExecutorsPending(newTotalExecutors)
+ numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
@@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager(
*/
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
- val currentTarget = targetNumExecutors
- if (currentTarget >= maxNumExecutors) {
- logDebug(s"Not adding executors because there are already ${executorIds.size} " +
- s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
+ if (numExecutorsTarget >= maxNumExecutors) {
+ val numExecutorsPending = numExecutorsTarget - executorIds.size
+ logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
+ s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
- val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
- val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
- val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
+ val oldNumExecutorsTarget = numExecutorsTarget
+ // There's no point in wasting time ramping up to the number of executors we already have, so
+ // make sure our target is at least as much as our current allocation:
+ numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
+ // Boost our target with the number to add for this round:
+ numExecutorsTarget += numExecutorsToAdd
+ // Ensure that our target doesn't exceed what we need at the present moment:
+ numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
+ // Ensure that our target fits within configured bounds:
+ numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
+
+ val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
if (addRequestAcknowledged) {
- val delta = updateNumExecutorsPending(newTotalExecutors)
+ val delta = numExecutorsTarget - oldNumExecutorsTarget
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
- s" (new desired total will be $newTotalExecutors)")
+ s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
@@ -304,24 +317,12 @@ private[spark] class ExecutorAllocationManager(
delta
} else {
logWarning(
- s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
+ s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
0
}
}
/**
- * Given the new target number of executors, update the number of pending executor requests,
- * and return the delta from the old number of pending requests.
- */
- private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
- val newNumExecutorsPending =
- newTotalExecutors - executorIds.size + executorsPendingToRemove.size
- val delta = newNumExecutorsPending - numExecutorsPending
- numExecutorsPending = newNumExecutorsPending
- delta
- }
-
- /**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
*/
@@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager(
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
- if (numExecutorsPending > 0) {
- numExecutorsPending -= 1
- logDebug(s"Decremented number of pending executors ($numExecutorsPending left)")
- }
} else {
logWarning(s"Duplicate executor $executorId has registered")
}