aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-05-01 18:32:46 -0700
committerAndrew Or <andrew@databricks.com>2015-05-01 18:33:15 -0700
commit099327d5376554134c9af49bc2045add4cfb024d (patch)
treeca92055b6720c1c13ba5a450e625e82f62e8345f /core
parentae98eec730125c1153dcac9ea941959cc79e4f42 (diff)
downloadspark-099327d5376554134c9af49bc2045add4cfb024d.tar.gz
spark-099327d5376554134c9af49bc2045add4cfb024d.tar.bz2
spark-099327d5376554134c9af49bc2045add4cfb024d.zip
[SPARK-6954] [YARN] ExecutorAllocationManager can end up requesting a negative n...
...umber of executors Author: Sandy Ryza <sandy@cloudera.com> Closes #5704 from sryza/sandy-spark-6954 and squashes the following commits: b7890fb [Sandy Ryza] Avoid ramping up to an existing number of executors 6eb516a [Sandy Ryza] SPARK-6954. ExecutorAllocationManager can end up requesting a negative number of executors
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala101
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala171
2 files changed, 148 insertions, 124 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b986fa87dc..228d9149df 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -27,13 +27,20 @@ import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
*
- * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
- * the scheduler queue is not drained in N seconds, then new executors are added. If the queue
- * persists for another M seconds, then more executors are added and so on. The number added
- * in each round increases exponentially from the previous round until an upper bound on the
- * number of executors has been reached. The upper bound is based both on a configured property
- * and on the number of tasks pending: the policy will never increase the number of executor
- * requests past the number needed to handle all pending tasks.
+ * The ExecutorAllocationManager maintains a moving target number of executors which is periodically
+ * synced to the cluster manager. The target starts at a configured initial value and changes with
+ * the number of pending and running tasks.
+ *
+ * Decreasing the target number of executors happens when the current target is more than needed to
+ * handle the current load. The target number of executors is always truncated to the number of
+ * executors that could run all current running and pending tasks at once.
+ *
+ * Increasing the target number of executors happens in response to backlogged tasks waiting to be
+ * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
+ * the queue persists for another M seconds, then more executors are added and so on. The number
+ * added in each round increases exponentially from the previous round until an upper bound has been
+ * reached. The upper bound is based both on a configured property and on the current number of
+ * running and pending tasks, as described above.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager(
// Number of executors to add in the next round
private var numExecutorsToAdd = 1
- // Number of executors that have been requested but have not registered yet
- private var numExecutorsPending = 0
+ // The desired number of executors at this moment in time. If all our executors were to die, this
+ // is the number of executors we would immediately want from the cluster manager.
+ private var numExecutorsTarget =
+ conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
@@ -200,13 +209,6 @@ private[spark] class ExecutorAllocationManager(
}
/**
- * The number of executors we would have if the cluster manager were to fulfill all our existing
- * requests.
- */
- private def targetNumExecutors(): Int =
- numExecutorsPending + executorIds.size - executorsPendingToRemove.size
-
- /**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
*/
@@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
- addOrCancelExecutorRequests(now)
+ updateAndSyncNumExecutorsTarget(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
@@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Updates our target number of executors and syncs the result with the cluster manager.
+ *
* Check to see whether our existing allocation and the requests we've made previously exceed our
- * current needs. If so, let the cluster manager know so that it can cancel pending requests that
- * are unneeded.
+ * current needs. If so, truncate our target and let the cluster manager know so that it can
+ * cancel pending requests that are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
* @return the delta in the target number of executors.
*/
- private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
- val currentTarget = targetNumExecutors
+ private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded
- if (maxNeeded < currentTarget) {
+ if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
- // executors and inform the cluster manager to cancel the extra pending requests.
- val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
- client.requestTotalExecutors(newTotalExecutors)
+ // executors and inform the cluster manager to cancel the extra pending requests
+ val oldNumExecutorsTarget = numExecutorsTarget
+ numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
+ client.requestTotalExecutors(numExecutorsTarget)
numExecutorsToAdd = 1
- updateNumExecutorsPending(newTotalExecutors)
+ numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
@@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager(
*/
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
- val currentTarget = targetNumExecutors
- if (currentTarget >= maxNumExecutors) {
- logDebug(s"Not adding executors because there are already ${executorIds.size} " +
- s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
+ if (numExecutorsTarget >= maxNumExecutors) {
+ val numExecutorsPending = numExecutorsTarget - executorIds.size
+ logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
+ s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
- val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
- val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
- val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
+ val oldNumExecutorsTarget = numExecutorsTarget
+ // There's no point in wasting time ramping up to the number of executors we already have, so
+ // make sure our target is at least as much as our current allocation:
+ numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
+ // Boost our target with the number to add for this round:
+ numExecutorsTarget += numExecutorsToAdd
+ // Ensure that our target doesn't exceed what we need at the present moment:
+ numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
+ // Ensure that our target fits within configured bounds:
+ numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
+
+ val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
if (addRequestAcknowledged) {
- val delta = updateNumExecutorsPending(newTotalExecutors)
+ val delta = numExecutorsTarget - oldNumExecutorsTarget
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
- s" (new desired total will be $newTotalExecutors)")
+ s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
@@ -304,24 +317,12 @@ private[spark] class ExecutorAllocationManager(
delta
} else {
logWarning(
- s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
+ s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
0
}
}
/**
- * Given the new target number of executors, update the number of pending executor requests,
- * and return the delta from the old number of pending requests.
- */
- private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
- val newNumExecutorsPending =
- newTotalExecutors - executorIds.size + executorsPendingToRemove.size
- val delta = newNumExecutorsPending - numExecutorsPending
- numExecutorsPending = newNumExecutorsPending
- delta
- }
-
- /**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
*/
@@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager(
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
- if (numExecutorsPending > 0) {
- numExecutorsPending -= 1
- logDebug(s"Decremented number of pending executors ($numExecutorsPending left)")
- }
} else {
logWarning(s"Duplicate executor $executorId has registered")
}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 22acc270b9..49e6de4e0b 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -78,7 +78,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
test("starting state") {
sc = createSparkContext()
val manager = sc.executorAllocationManager.get
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 1)
assert(executorsPendingToRemove(manager).isEmpty)
assert(executorIds(manager).isEmpty)
assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
@@ -91,108 +91,108 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Keep adding until the limit is reached
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsTarget(manager) === 2)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 3)
+ assert(numExecutorsTarget(manager) === 4)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 4)
- assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsTarget(manager) === 8)
assert(numExecutorsToAdd(manager) === 8)
- assert(addExecutors(manager) === 3) // reached the limit of 10
- assert(numExecutorsPending(manager) === 10)
+ assert(addExecutors(manager) === 2) // reached the limit of 10
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 10)
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
// Register previously requested executors
onExecutorAdded(manager, "first")
- assert(numExecutorsPending(manager) === 9)
+ assert(numExecutorsTarget(manager) === 10)
onExecutorAdded(manager, "second")
onExecutorAdded(manager, "third")
onExecutorAdded(manager, "fourth")
- assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsTarget(manager) === 10)
onExecutorAdded(manager, "first") // duplicates should not count
onExecutorAdded(manager, "second")
- assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsTarget(manager) === 10)
// Try adding again
// This should still fail because the number pending + running is still at the limit
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
}
test("add executors capped by num pending tasks") {
- sc = createSparkContext(1, 10)
+ sc = createSparkContext(0, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
// Verify that we're capped at number of tasks in the stage
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 3)
+ assert(numExecutorsTarget(manager) === 3)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 5)
+ assert(numExecutorsTarget(manager) === 5)
assert(numExecutorsToAdd(manager) === 1)
- // Verify that running a task reduces the cap
+ // Verify that running a task doesn't affect the target
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
- assert(numExecutorsPending(manager) === 4)
+ assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 5)
+ assert(numExecutorsTarget(manager) === 6)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsTarget(manager) === 8)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsTarget(manager) === 8)
assert(numExecutorsToAdd(manager) === 1)
- // Verify that re-running a task doesn't reduce the cap further
+ // Verify that re-running a task doesn't blow things up
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3)))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 8)
+ assert(numExecutorsTarget(manager) === 9)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 9)
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a task once we're at our limit doesn't blow things up
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 9)
+ assert(numExecutorsTarget(manager) === 10)
}
test("cancel pending executors when no longer needed") {
- sc = createSparkContext(1, 10)
+ sc = createSparkContext(0, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 3)
+ assert(numExecutorsTarget(manager) === 3)
val task1Info = createTaskInfo(0, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))
@@ -266,7 +266,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
// Add a few executors
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2)
- assert(addExecutors(manager) === 4)
onExecutorAdded(manager, "1")
onExecutorAdded(manager, "2")
onExecutorAdded(manager, "3")
@@ -274,55 +273,57 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
onExecutorAdded(manager, "5")
onExecutorAdded(manager, "6")
onExecutorAdded(manager, "7")
- assert(executorIds(manager).size === 7)
+ onExecutorAdded(manager, "8")
+ assert(executorIds(manager).size === 8)
// Remove until limit
assert(removeExecutor(manager, "1"))
assert(removeExecutor(manager, "2"))
- assert(!removeExecutor(manager, "3")) // lower limit reached
- assert(!removeExecutor(manager, "4"))
+ assert(removeExecutor(manager, "3"))
+ assert(!removeExecutor(manager, "4")) // lower limit reached
+ assert(!removeExecutor(manager, "5"))
onExecutorRemoved(manager, "1")
onExecutorRemoved(manager, "2")
+ onExecutorRemoved(manager, "3")
assert(executorIds(manager).size === 5)
// Add until limit
- assert(addExecutors(manager) === 5) // upper limit reached
+ assert(addExecutors(manager) === 2) // upper limit reached
assert(addExecutors(manager) === 0)
- assert(!removeExecutor(manager, "3")) // still at lower limit
- assert(!removeExecutor(manager, "4"))
- onExecutorAdded(manager, "8")
+ assert(!removeExecutor(manager, "4")) // still at lower limit
+ assert(!removeExecutor(manager, "5"))
onExecutorAdded(manager, "9")
onExecutorAdded(manager, "10")
onExecutorAdded(manager, "11")
onExecutorAdded(manager, "12")
+ onExecutorAdded(manager, "13")
assert(executorIds(manager).size === 10)
// Remove succeeds again, now that we are no longer at the lower limit
- assert(removeExecutor(manager, "3"))
assert(removeExecutor(manager, "4"))
assert(removeExecutor(manager, "5"))
assert(removeExecutor(manager, "6"))
+ assert(removeExecutor(manager, "7"))
assert(executorIds(manager).size === 10)
- assert(addExecutors(manager) === 1)
- onExecutorRemoved(manager, "3")
+ assert(addExecutors(manager) === 0)
onExecutorRemoved(manager, "4")
+ onExecutorRemoved(manager, "5")
assert(executorIds(manager).size === 8)
- // Add succeeds again, now that we are no longer at the upper limit
- // Number of executors added restarts at 1
- assert(addExecutors(manager) === 2)
- assert(addExecutors(manager) === 1) // upper limit reached
+ // Number of executors pending restarts at 1
+ assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 0)
assert(executorIds(manager).size === 8)
- onExecutorRemoved(manager, "5")
onExecutorRemoved(manager, "6")
- onExecutorAdded(manager, "13")
+ onExecutorRemoved(manager, "7")
onExecutorAdded(manager, "14")
+ onExecutorAdded(manager, "15")
assert(executorIds(manager).size === 8)
assert(addExecutors(manager) === 0) // still at upper limit
- onExecutorAdded(manager, "15")
onExecutorAdded(manager, "16")
+ onExecutorAdded(manager, "17")
assert(executorIds(manager).size === 10)
+ assert(numExecutorsTarget(manager) === 10)
}
test("starting/canceling add timer") {
@@ -405,33 +406,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
}
test("mock polling loop with no events") {
- sc = createSparkContext(1, 20)
+ sc = createSparkContext(0, 20)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(2020L)
manager.setClock(clock)
// No events - we should not be adding or removing
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
schedule(manager)
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
clock.advance(100L)
schedule(manager)
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
clock.advance(1000L)
schedule(manager)
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
clock.advance(10000L)
schedule(manager)
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
}
test("mock polling loop add behavior") {
- sc = createSparkContext(1, 20)
+ sc = createSparkContext(0, 20)
val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -441,43 +442,43 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
onSchedulerBacklogged(manager)
clock.advance(schedulerBacklogTimeout * 1000 / 2)
schedule(manager)
- assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
+ assert(numExecutorsTarget(manager) === 0) // timer not exceeded yet
clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 1) // first timer exceeded
+ assert(numExecutorsTarget(manager) === 1) // first timer exceeded
clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2)
schedule(manager)
- assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
+ assert(numExecutorsTarget(manager) === 1) // second timer not exceeded yet
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
+ assert(numExecutorsTarget(manager) === 1 + 2) // second timer exceeded
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
+ assert(numExecutorsTarget(manager) === 1 + 2 + 4) // third timer exceeded
// Scheduler queue drained
onSchedulerQueueEmpty(manager)
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7) // timer is canceled
+ assert(numExecutorsTarget(manager) === 7) // timer is canceled
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsTarget(manager) === 7)
// Scheduler queue backlogged again
onSchedulerBacklogged(manager)
clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
+ assert(numExecutorsTarget(manager) === 7 + 1) // timer restarted
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7 + 1 + 2)
+ assert(numExecutorsTarget(manager) === 7 + 1 + 2)
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
+ assert(numExecutorsTarget(manager) === 7 + 1 + 2 + 4)
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 20) // limit reached
+ assert(numExecutorsTarget(manager) === 20) // limit reached
}
test("mock polling loop remove behavior") {
@@ -671,6 +672,31 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
assert(!removeTimes(manager).contains("executor-1"))
}
+ test("avoid ramp up when target < running executors") {
+ sc = createSparkContext(0, 100000)
+ val manager = sc.executorAllocationManager.get
+ val stage1 = createStageInfo(0, 1000)
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
+
+ assert(addExecutors(manager) === 1)
+ assert(addExecutors(manager) === 2)
+ assert(addExecutors(manager) === 4)
+ assert(addExecutors(manager) === 8)
+ assert(numExecutorsTarget(manager) === 15)
+ (0 until 15).foreach { i =>
+ onExecutorAdded(manager, s"executor-$i")
+ }
+ assert(executorIds(manager).size === 15)
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
+
+ adjustRequestedExecutors(manager)
+ assert(numExecutorsTarget(manager) === 0)
+
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000)))
+ addExecutors(manager)
+ assert(numExecutorsTarget(manager) === 16)
+ }
+
private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
@@ -713,7 +739,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
* ------------------------------------------------------- */
private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
- private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
+ private val _numExecutorsTarget = PrivateMethod[Int]('numExecutorsTarget)
private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded)
private val _executorsPendingToRemove =
PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
@@ -722,7 +748,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
private val _schedule = PrivateMethod[Unit]('schedule)
private val _addExecutors = PrivateMethod[Int]('addExecutors)
- private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests)
+ private val _updateAndSyncNumExecutorsTarget =
+ PrivateMethod[Int]('updateAndSyncNumExecutorsTarget)
private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
@@ -735,8 +762,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _numExecutorsToAdd()
}
- private def numExecutorsPending(manager: ExecutorAllocationManager): Int = {
- manager invokePrivate _numExecutorsPending()
+ private def numExecutorsTarget(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _numExecutorsTarget()
}
private def executorsPendingToRemove(
@@ -766,7 +793,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
}
private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
- manager invokePrivate _addOrCancelExecutorRequests(0L)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(0L)
}
private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {