aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorDhruve Ashar <dashar@yahoo-inc.com>2016-09-22 10:10:37 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-09-22 10:10:37 -0700
commit17b72d31e0c59711eddeb525becb8085930eadcc (patch)
tree89c82299dc5a3a6be368d78b9dd3caa64e1e5ec7 /core/src/main/scala/org/apache
parent8a02410a92429bff50d6ce082f873cea9e9fa91e (diff)
downloadspark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.gz
spark-17b72d31e0c59711eddeb525becb8085930eadcc.tar.bz2
spark-17b72d31e0c59711eddeb525becb8085930eadcc.zip
[SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.
## What changes were proposed in this pull request? We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor. ## How was this patch tested? Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled. Author: Dhruve Ashar <dashar@yahoo-inc.com> Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #15152 from dhruve/impr/SPARK-17365.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala12
4 files changed, 94 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 8baddf45bf..5d47f624ac 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -54,13 +54,16 @@ private[spark] trait ExecutorAllocationClient {
/**
* Request that the cluster manager kill the specified executors.
- * @return whether the request is acknowledged by the cluster manager.
+ * @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
- def killExecutors(executorIds: Seq[String]): Boolean
+ def killExecutors(executorIds: Seq[String]): Seq[String]
/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
- def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
+ def killExecutor(executorId: String): Boolean = {
+ val killedExecutors = killExecutors(Seq(executorId))
+ killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 6f320c5242..1366251d06 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.TimeUnit
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import scala.util.control.ControlThrowable
import com.codahale.metrics.{Gauge, MetricRegistry}
@@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager(
updateAndSyncNumExecutorsTarget(now)
+ val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
- removeExecutor(executorId)
+ executorIdsToBeRemoved += executorId
}
!expired
}
+ if (executorIdsToBeRemoved.nonEmpty) {
+ removeExecutors(executorIdsToBeRemoved)
+ }
}
/**
@@ -392,10 +397,66 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Request the cluster manager to remove the given executors.
+ * Returns the list of executors which are removed.
+ */
+ private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
+ val executorIdsToBeRemoved = new ArrayBuffer[String]
+
+ logInfo("Request to remove executorIds: " + executors.mkString(", "))
+ val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size
+
+ var newExecutorTotal = numExistingExecutors
+ executors.foreach { executorIdToBeRemoved =>
+ if (newExecutorTotal - 1 < minNumExecutors) {
+ logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
+ s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
+ } else if (canBeKilled(executorIdToBeRemoved)) {
+ executorIdsToBeRemoved += executorIdToBeRemoved
+ newExecutorTotal -= 1
+ }
+ }
+
+ if (executorIdsToBeRemoved.isEmpty) {
+ return Seq.empty[String]
+ }
+
+ // Send a request to the backend to kill this executor(s)
+ val executorsRemoved = if (testing) {
+ executorIdsToBeRemoved
+ } else {
+ client.killExecutors(executorIdsToBeRemoved)
+ }
+ // reset the newExecutorTotal to the existing number of executors
+ newExecutorTotal = numExistingExecutors
+ if (testing || executorsRemoved.nonEmpty) {
+ executorsRemoved.foreach { removedExecutorId =>
+ newExecutorTotal -= 1
+ logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
+ s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
+ executorsPendingToRemove.add(removedExecutorId)
+ }
+ executorsRemoved
+ } else {
+ logWarning(s"Unable to reach the cluster manager to kill executor/s " +
+ "executorIdsToBeRemoved.mkString(\",\") or no executor eligible to kill!")
+ Seq.empty[String]
+ }
+ }
+
+ /**
* Request the cluster manager to remove the given executor.
- * Return whether the request is received.
+ * Return whether the request is acknowledged.
*/
private def removeExecutor(executorId: String): Boolean = synchronized {
+ val executorsRemoved = removeExecutors(Seq(executorId))
+ executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
+ }
+
+ /**
+ * Determine if the given executor can be killed.
+ */
+ private def canBeKilled(executorId: String): Boolean = synchronized {
// Do not kill the executor if we are not aware of it (should never happen)
if (!executorIds.contains(executorId)) {
logWarning(s"Attempted to remove unknown executor $executorId!")
@@ -409,26 +470,7 @@ private[spark] class ExecutorAllocationManager(
return false
}
- // Do not kill the executor if we have already reached the lower bound
- val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
- if (numExistingExecutors - 1 < minNumExecutors) {
- logDebug(s"Not removing idle executor $executorId because there are only " +
- s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
- return false
- }
-
- // Send a request to the backend to kill this executor
- val removeRequestAcknowledged = testing || client.killExecutor(executorId)
- if (removeRequestAcknowledged) {
- logInfo(s"Removing executor $executorId because it has been idle for " +
- s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
- executorsPendingToRemove.add(executorId)
- true
- } else {
- logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
- s"or no executor eligible to kill!")
- false
- }
+ true
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1981ad5671..f58037e100 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -73,7 +73,7 @@ import org.apache.spark.util._
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
-class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
+class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
@@ -534,7 +534,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
- Some(new ExecutorAllocationManager(this, listenerBus, _conf))
+ schedulerBackend match {
+ case b: ExecutorAllocationClient =>
+ Some(new ExecutorAllocationManager(
+ schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
+ case _ =>
+ None
+ }
} else {
None
}
@@ -1473,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}
- private[spark] override def getExecutorIds(): Seq[String] = {
+ private[spark] def getExecutorIds(): Seq[String] = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.getExecutorIds()
@@ -1498,7 +1504,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is acknowledged by the cluster manager.
*/
@DeveloperApi
- override def requestTotalExecutors(
+ def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
@@ -1518,7 +1524,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
- override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+ def requestExecutors(numAdditionalExecutors: Int): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
@@ -1540,10 +1546,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
- override def killExecutors(executorIds: Seq[String]): Boolean = {
+ def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- b.killExecutors(executorIds, replace = false, force = true)
+ b.killExecutors(executorIds, replace = false, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
@@ -1562,7 +1568,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
- override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
+ def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
/**
* Request that the cluster manager kill the specified executor without adjusting the
@@ -1581,7 +1587,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- b.killExecutors(Seq(executorId), replace = true, force = true)
+ b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c6b3fdf439..edc3c19937 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -528,7 +528,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
- final override def killExecutors(executorIds: Seq[String]): Boolean = {
+ final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
killExecutors(executorIds, replace = false, force = false)
}
@@ -548,7 +548,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
- force: Boolean): Boolean = {
+ force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val response = synchronized {
@@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
+ logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")
+
// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
@@ -583,7 +585,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
_ => Future.successful(false)
}
- adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+ val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+
+ killResponse.flatMap(killSuccessful =>
+ Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String])
+ )(ThreadUtils.sameThread)
}
defaultAskTimeout.awaitResult(response)