diff options
author | Dhruve Ashar <dashar@yahoo-inc.com> | 2016-09-22 10:10:37 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-09-22 10:10:37 -0700 |
commit | 17b72d31e0c59711eddeb525becb8085930eadcc (patch) | |
tree | 89c82299dc5a3a6be368d78b9dd3caa64e1e5ec7 /core/src/main/scala/org/apache | |
parent | 8a02410a92429bff50d6ce082f873cea9e9fa91e (diff) | |
download | spark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.gz spark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.bz2 spark-17b72d31e0c59711eddeb525becb8085930eadcc.zip |
[SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.
## What changes were proposed in this pull request?
We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor.
## How was this patch tested?
Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled.
Author: Dhruve Ashar <dashar@yahoo-inc.com>
Author: Dhruve Ashar <dhruveashar@gmail.com>
Closes #15152 from dhruve/impr/SPARK-17365.
Diffstat (limited to 'core/src/main/scala/org/apache')
4 files changed, 94 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 8baddf45bf..5d47f624ac 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -54,13 +54,16 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager kill the specified executors. - * @return whether the request is acknowledged by the cluster manager. + * @return the ids of the executors acknowledged by the cluster manager to be removed. */ - def killExecutors(executorIds: Seq[String]): Boolean + def killExecutors(executorIds: Seq[String]): Seq[String] /** * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) + def killExecutor(executorId: String): Boolean = { + val killedExecutors = killExecutors(Seq(executorId)) + killedExecutors.nonEmpty && killedExecutors(0).equals(executorId) + } } diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 6f320c5242..1366251d06 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.util.concurrent.TimeUnit import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.control.ControlThrowable import com.codahale.metrics.{Gauge, MetricRegistry} @@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager( updateAndSyncNumExecutorsTarget(now) + val executorIdsToBeRemoved = ArrayBuffer[String]() removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime if (expired) { initializing = false - removeExecutor(executorId) + executorIdsToBeRemoved += executorId } !expired } + if (executorIdsToBeRemoved.nonEmpty) { + removeExecutors(executorIdsToBeRemoved) + } } /** @@ -392,10 +397,66 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. + * Returns the list of executors which are removed. + */ + private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { + val executorIdsToBeRemoved = new ArrayBuffer[String] + + logInfo("Request to remove executorIds: " + executors.mkString(", ")) + val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size + + var newExecutorTotal = numExistingExecutors + executors.foreach { executorIdToBeRemoved => + if (newExecutorTotal - 1 < minNumExecutors) { + logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + + s"$newExecutorTotal executor(s) left (limit $minNumExecutors)") + } else if (canBeKilled(executorIdToBeRemoved)) { + executorIdsToBeRemoved += executorIdToBeRemoved + newExecutorTotal -= 1 + } + } + + if (executorIdsToBeRemoved.isEmpty) { + return Seq.empty[String] + } + + // Send a request to the backend to kill this executor(s) + val executorsRemoved = if (testing) { + executorIdsToBeRemoved + } else { + client.killExecutors(executorIdsToBeRemoved) + } + // reset the newExecutorTotal to the existing number of executors + newExecutorTotal = numExistingExecutors + if (testing || executorsRemoved.nonEmpty) { + executorsRemoved.foreach { removedExecutorId => + newExecutorTotal -= 1 + logInfo(s"Removing executor $removedExecutorId because it has been idle for " + + s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)") + executorsPendingToRemove.add(removedExecutorId) + } + executorsRemoved + } else { + logWarning(s"Unable to reach the cluster manager to kill executor/s " + + "executorIdsToBeRemoved.mkString(\",\") or no executor eligible to kill!") + Seq.empty[String] + } + } + + /** * Request the cluster manager to remove the given executor. - * Return whether the request is received. + * Return whether the request is acknowledged. */ private def removeExecutor(executorId: String): Boolean = synchronized { + val executorsRemoved = removeExecutors(Seq(executorId)) + executorsRemoved.nonEmpty && executorsRemoved(0) == executorId + } + + /** + * Determine if the given executor can be killed. + */ + private def canBeKilled(executorId: String): Boolean = synchronized { // Do not kill the executor if we are not aware of it (should never happen) if (!executorIds.contains(executorId)) { logWarning(s"Attempted to remove unknown executor $executorId!") @@ -409,26 +470,7 @@ private[spark] class ExecutorAllocationManager( return false } - // Do not kill the executor if we have already reached the lower bound - val numExistingExecutors = executorIds.size - executorsPendingToRemove.size - if (numExistingExecutors - 1 < minNumExecutors) { - logDebug(s"Not removing idle executor $executorId because there are only " + - s"$numExistingExecutors executor(s) left (limit $minNumExecutors)") - return false - } - - // Send a request to the backend to kill this executor - val removeRequestAcknowledged = testing || client.killExecutor(executorId) - if (removeRequestAcknowledged) { - logInfo(s"Removing executor $executorId because it has been idle for " + - s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})") - executorsPendingToRemove.add(executorId) - true - } else { - logWarning(s"Unable to reach the cluster manager to kill executor $executorId," + - s"or no executor eligible to kill!") - false - } + true } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1981ad5671..f58037e100 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -73,7 +73,7 @@ import org.apache.spark.util._ * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ -class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { +class SparkContext(config: SparkConf) extends Logging { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() @@ -534,7 +534,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { - Some(new ExecutorAllocationManager(this, listenerBus, _conf)) + schedulerBackend match { + case b: ExecutorAllocationClient => + Some(new ExecutorAllocationManager( + schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf)) + case _ => + None + } } else { None } @@ -1473,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli listenerBus.addListener(listener) } - private[spark] override def getExecutorIds(): Seq[String] = { + private[spark] def getExecutorIds(): Seq[String] = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.getExecutorIds() @@ -1498,7 +1504,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is acknowledged by the cluster manager. */ @DeveloperApi - override def requestTotalExecutors( + def requestTotalExecutors( numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: scala.collection.immutable.Map[String, Int] @@ -1518,7 +1524,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def requestExecutors(numAdditionalExecutors: Int): Boolean = { + def requestExecutors(numAdditionalExecutors: Int): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1540,10 +1546,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def killExecutors(executorIds: Seq[String]): Boolean = { + def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(executorIds, replace = false, force = true) + b.killExecutors(executorIds, replace = false, force = true).nonEmpty case _ => logWarning("Killing executors is only supported in coarse-grained mode") false @@ -1562,7 +1568,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId) + def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) /** * Request that the cluster manager kill the specified executor without adjusting the @@ -1581,7 +1587,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(Seq(executorId), replace = true, force = true) + b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty case _ => logWarning("Killing executors is only supported in coarse-grained mode") false diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c6b3fdf439..edc3c19937 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -528,7 +528,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the kill request is acknowledged. If list to kill is empty, it will return * false. */ - final override def killExecutors(executorIds: Seq[String]): Boolean = { + final override def killExecutors(executorIds: Seq[String]): Seq[String] = { killExecutors(executorIds, replace = false, force = false) } @@ -548,7 +548,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp final def killExecutors( executorIds: Seq[String], replace: Boolean, - force: Boolean): Boolean = { + force: Boolean): Seq[String] = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") val response = synchronized { @@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp .filter { id => force || !scheduler.isExecutorBusy(id) } executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}") + // If we do not wish to replace the executors we kill, sync the target number of executors // with the cluster manager to avoid allocating new ones. When computing the new target, // take into account executors that are pending to be added or removed. @@ -583,7 +585,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp _ => Future.successful(false) } - adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + + killResponse.flatMap(killSuccessful => + Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String]) + )(ThreadUtils.sameThread) } defaultAskTimeout.awaitResult(response) |