aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala26
-rw-r--r--docs/configuration.md4
2 files changed, 20 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 66bda68088..9514604752 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -91,7 +91,7 @@ private[spark] class ExecutorAllocationManager(
// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
- "spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
+ "spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
@@ -99,7 +99,7 @@ private[spark] class ExecutorAllocationManager(
// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
- "spark.dynamicAllocation.executorIdleTimeout", "600s")
+ "spark.dynamicAllocation.executorIdleTimeout", "60s")
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
@@ -268,6 +268,8 @@ private[spark] class ExecutorAllocationManager(
numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(numExecutorsTarget)
numExecutorsToAdd = 1
+ logInfo(s"Lowering target number of executors to $numExecutorsTarget because " +
+ s"not all requests are actually needed (previously $oldNumExecutorsTarget)")
numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
@@ -292,9 +294,8 @@ private[spark] class ExecutorAllocationManager(
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
if (numExecutorsTarget >= maxNumExecutors) {
- val numExecutorsPending = numExecutorsTarget - executorIds.size
- logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
- s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
+ logDebug(s"Not adding executors because our current target total " +
+ s"is already $numExecutorsTarget (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
@@ -310,10 +311,19 @@ private[spark] class ExecutorAllocationManager(
// Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
+ val delta = numExecutorsTarget - oldNumExecutorsTarget
+
+ // If our target has not changed, do not send a message
+ // to the cluster manager and reset our exponential growth
+ if (delta == 0) {
+ numExecutorsToAdd = 1
+ return 0
+ }
+
val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
if (addRequestAcknowledged) {
- val delta = numExecutorsTarget - oldNumExecutorsTarget
- logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
+ val executorsString = "executor" + { if (delta > 1) "s" else "" }
+ logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
@@ -420,7 +430,7 @@ private[spark] class ExecutorAllocationManager(
* This resets all variables used for adding executors.
*/
private def onSchedulerQueueEmpty(): Unit = synchronized {
- logDebug(s"Clearing timer to add executors because there are no more pending tasks")
+ logDebug("Clearing timer to add executors because there are no more pending tasks")
addTime = NOT_SET
numExecutorsToAdd = 1
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 0de824546c..30508a617f 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1194,7 +1194,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.dynamicAllocation.executorIdleTimeout</code></td>
- <td>600s</td>
+ <td>60s</td>
<td>
If dynamic allocation is enabled and an executor has been idle for more than this duration,
the executor will be removed. For more detail, see this
@@ -1224,7 +1224,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
- <td>5s</td>
+ <td>1s</td>
<td>
If dynamic allocation is enabled and there have been pending tasks backlogged for more than
this duration, new executors will be requested. For more detail, see this