diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala | 86 |
1 files changed, 64 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 6f320c5242..1366251d06 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.util.concurrent.TimeUnit import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.control.ControlThrowable import com.codahale.metrics.{Gauge, MetricRegistry} @@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager( updateAndSyncNumExecutorsTarget(now) + val executorIdsToBeRemoved = ArrayBuffer[String]() removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime if (expired) { initializing = false - removeExecutor(executorId) + executorIdsToBeRemoved += executorId } !expired } + if (executorIdsToBeRemoved.nonEmpty) { + removeExecutors(executorIdsToBeRemoved) + } } /** @@ -392,10 +397,66 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. + * Returns the list of executors which are removed. + */ + private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { + val executorIdsToBeRemoved = new ArrayBuffer[String] + + logInfo("Request to remove executorIds: " + executors.mkString(", ")) + val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size + + var newExecutorTotal = numExistingExecutors + executors.foreach { executorIdToBeRemoved => + if (newExecutorTotal - 1 < minNumExecutors) { + logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + + s"$newExecutorTotal executor(s) left (limit $minNumExecutors)") + } else if (canBeKilled(executorIdToBeRemoved)) { + executorIdsToBeRemoved += executorIdToBeRemoved + newExecutorTotal -= 1 + } + } + + if (executorIdsToBeRemoved.isEmpty) { + return Seq.empty[String] + } + + // Send a request to the backend to kill this executor(s) + val executorsRemoved = if (testing) { + executorIdsToBeRemoved + } else { + client.killExecutors(executorIdsToBeRemoved) + } + // reset the newExecutorTotal to the existing number of executors + newExecutorTotal = numExistingExecutors + if (testing || executorsRemoved.nonEmpty) { + executorsRemoved.foreach { removedExecutorId => + newExecutorTotal -= 1 + logInfo(s"Removing executor $removedExecutorId because it has been idle for " + + s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)") + executorsPendingToRemove.add(removedExecutorId) + } + executorsRemoved + } else { + logWarning(s"Unable to reach the cluster manager to kill executor/s " + + "executorIdsToBeRemoved.mkString(\",\") or no executor eligible to kill!") + Seq.empty[String] + } + } + + /** * Request the cluster manager to remove the given executor. - * Return whether the request is received. + * Return whether the request is acknowledged. */ private def removeExecutor(executorId: String): Boolean = synchronized { + val executorsRemoved = removeExecutors(Seq(executorId)) + executorsRemoved.nonEmpty && executorsRemoved(0) == executorId + } + + /** + * Determine if the given executor can be killed. + */ + private def canBeKilled(executorId: String): Boolean = synchronized { // Do not kill the executor if we are not aware of it (should never happen) if (!executorIds.contains(executorId)) { logWarning(s"Attempted to remove unknown executor $executorId!") @@ -409,26 +470,7 @@ private[spark] class ExecutorAllocationManager( return false } - // Do not kill the executor if we have already reached the lower bound - val numExistingExecutors = executorIds.size - executorsPendingToRemove.size - if (numExistingExecutors - 1 < minNumExecutors) { - logDebug(s"Not removing idle executor $executorId because there are only " + - s"$numExistingExecutors executor(s) left (limit $minNumExecutors)") - return false - } - - // Send a request to the backend to kill this executor - val removeRequestAcknowledged = testing || client.killExecutor(executorId) - if (removeRequestAcknowledged) { - logInfo(s"Removing executor $executorId because it has been idle for " + - s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})") - executorsPendingToRemove.add(executorId) - true - } else { - logWarning(s"Unable to reach the cluster manager to kill executor $executorId," + - s"or no executor eligible to kill!") - false - } + true } /** |