aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala86
1 files changed, 64 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 6f320c5242..1366251d06 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.TimeUnit
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import scala.util.control.ControlThrowable
import com.codahale.metrics.{Gauge, MetricRegistry}
@@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager(
updateAndSyncNumExecutorsTarget(now)
+ val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
- removeExecutor(executorId)
+ executorIdsToBeRemoved += executorId
}
!expired
}
+ if (executorIdsToBeRemoved.nonEmpty) {
+ removeExecutors(executorIdsToBeRemoved)
+ }
}
/**
@@ -392,10 +397,66 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Request the cluster manager to remove the given executors.
+ * Returns the list of executors which are removed.
+ */
+ private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
+ val executorIdsToBeRemoved = new ArrayBuffer[String]
+
+ logInfo("Request to remove executorIds: " + executors.mkString(", "))
+ val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size
+
+ var newExecutorTotal = numExistingExecutors
+ executors.foreach { executorIdToBeRemoved =>
+ if (newExecutorTotal - 1 < minNumExecutors) {
+ logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
+ s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
+ } else if (canBeKilled(executorIdToBeRemoved)) {
+ executorIdsToBeRemoved += executorIdToBeRemoved
+ newExecutorTotal -= 1
+ }
+ }
+
+ if (executorIdsToBeRemoved.isEmpty) {
+ return Seq.empty[String]
+ }
+
+ // Send a request to the backend to kill this executor(s)
+ val executorsRemoved = if (testing) {
+ executorIdsToBeRemoved
+ } else {
+ client.killExecutors(executorIdsToBeRemoved)
+ }
+ // reset the newExecutorTotal to the existing number of executors
+ newExecutorTotal = numExistingExecutors
+ if (testing || executorsRemoved.nonEmpty) {
+ executorsRemoved.foreach { removedExecutorId =>
+ newExecutorTotal -= 1
+ logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
+ s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
+ executorsPendingToRemove.add(removedExecutorId)
+ }
+ executorsRemoved
+ } else {
+ logWarning(s"Unable to reach the cluster manager to kill executor/s " +
+ "executorIdsToBeRemoved.mkString(\",\") or no executor eligible to kill!")
+ Seq.empty[String]
+ }
+ }
+
+ /**
* Request the cluster manager to remove the given executor.
- * Return whether the request is received.
+ * Return whether the request is acknowledged.
*/
private def removeExecutor(executorId: String): Boolean = synchronized {
+ val executorsRemoved = removeExecutors(Seq(executorId))
+ executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
+ }
+
+ /**
+ * Determine if the given executor can be killed.
+ */
+ private def canBeKilled(executorId: String): Boolean = synchronized {
// Do not kill the executor if we are not aware of it (should never happen)
if (!executorIds.contains(executorId)) {
logWarning(s"Attempted to remove unknown executor $executorId!")
@@ -409,26 +470,7 @@ private[spark] class ExecutorAllocationManager(
return false
}
- // Do not kill the executor if we have already reached the lower bound
- val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
- if (numExistingExecutors - 1 < minNumExecutors) {
- logDebug(s"Not removing idle executor $executorId because there are only " +
- s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
- return false
- }
-
- // Send a request to the backend to kill this executor
- val removeRequestAcknowledged = testing || client.killExecutor(executorId)
- if (removeRequestAcknowledged) {
- logInfo(s"Removing executor $executorId because it has been idle for " +
- s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
- executorsPendingToRemove.add(executorId)
- true
- } else {
- logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
- s"or no executor eligible to kill!")
- false
- }
+ true
}
/**