diff options
Diffstat (limited to 'core/src/main')
6 files changed, 94 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 5d47f624ac..e4b9f8111e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -54,11 +54,30 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager kill the specified executors. + * + * When asking the executor to be replaced, the executor loss is considered a failure, and + * killed tasks that are running on the executor will count towards the failure limits. If no + * replacement is being requested, then the tasks will not count towards the limit. + * + * @param executorIds identifiers of executors to kill + * @param replace whether to replace the killed executors with new ones, default false + * @param force whether to force kill busy executors, default false * @return the ids of the executors acknowledged by the cluster manager to be removed. */ - def killExecutors(executorIds: Seq[String]): Seq[String] + def killExecutors( + executorIds: Seq[String], + replace: Boolean = false, + force: Boolean = false): Seq[String] /** + * Request that the cluster manager kill every executor on the specified host. + * Results in a call to killExecutors for each executor on the host, with the replace + * and force arguments set to true. + * @return whether the request is acknowledged by the cluster manager. + */ + def killExecutorsOnHost(host: String): Boolean + + /** * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. */ diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 536f493b41..223c921810 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -139,6 +139,11 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val BLACKLIST_KILL_ENABLED = + ConfigBuilder("spark.blacklist.killBlacklistedExecutors") + .booleanConf + .createWithDefault(false) + private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF = ConfigBuilder("spark.scheduler.executorTaskBlacklistTime") .internal() diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 77d5c97a32..e130e609e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, + allocationClient: Option[ExecutorAllocationClient], clock: Clock = new SystemClock()) extends Logging { - def this(sc: SparkContext) = { - this(sc.listenerBus, sc.conf) + def this(sc: SparkContext, allocationClient: Option[ExecutorAllocationClient]) = { + this(sc.listenerBus, sc.conf, allocationClient) } BlacklistTracker.validateBlacklistConfs(conf) @@ -173,6 +174,17 @@ private[scheduler] class BlacklistTracker ( listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) executorIdToFailureList.remove(exec) updateNextExpiryTime() + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { + allocationClient match { + case Some(allocationClient) => + logInfo(s"Killing blacklisted executor id $exec " + + s"since spark.blacklist.killBlacklistedExecutors is set.") + allocationClient.killExecutors(Seq(exec), true, true) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec " + + s"since allocation client is not defined.") + } + } // In addition to blacklisting the executor, we also update the data for failures on the // node, and potentially put the entire node into a blacklist as well. @@ -187,6 +199,19 @@ private[scheduler] class BlacklistTracker ( nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { + allocationClient match { + case Some(allocationClient) => + logInfo(s"Killing all executors on blacklisted host $node " + + s"since spark.blacklist.killBlacklistedExecutors is set.") + if (allocationClient.killExecutorsOnHost(node) == false) { + logError(s"Killing executors on node $node failed.") + } + case None => + logWarning(s"Not attempting to kill executors on blacklisted host $node " + + s"since allocation client is not defined.") + } + } } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8ce2ca32ed..bfbcfa1aa3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -726,7 +726,11 @@ private[spark] object TaskSchedulerImpl { private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = { if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { - Some(new BlacklistTracker(sc)) + val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match { + case b: ExecutorAllocationClient => Some(b) + case _ => None + } + Some(new BlacklistTracker(sc, executorAllocClient)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 0280359809..2898cd7d17 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -43,6 +43,9 @@ private[spark] object CoarseGrainedClusterMessages { case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage + case class KillExecutorsOnHost(host: String) + extends CoarseGrainedClusterMessage + sealed trait RegisterExecutorResponse case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 31575c0ca0..e006cc9656 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -140,6 +140,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } + + case KillExecutorsOnHost(host) => + scheduler.getExecutorsAliveOnHost(host).foreach { exec => + killExecutors(exec.toSeq, replace = true, force = true) + } } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -148,6 +153,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) + } else if (scheduler.nodeBlacklist != null && + scheduler.nodeBlacklist.contains(hostname)) { + // If the cluster manager gives us an executor on a blacklisted node (because it + // already started allocating those resources before we informed it of our blacklist, + // or if it ignored our blacklist), then we reject that executor immediately. + logInfo(s"Rejecting $executorId as it has been blacklisted.") + executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId")) + context.reply(true) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. @@ -524,27 +537,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Request that the cluster manager kill the specified executors. - * @return whether the kill request is acknowledged. If list to kill is empty, it will return - * false. - */ - final override def killExecutors(executorIds: Seq[String]): Seq[String] = { - killExecutors(executorIds, replace = false, force = false) - } - - /** - * Request that the cluster manager kill the specified executors. * * When asking the executor to be replaced, the executor loss is considered a failure, and * killed tasks that are running on the executor will count towards the failure limits. If no * replacement is being requested, then the tasks will not count towards the limit. * * @param executorIds identifiers of executors to kill - * @param replace whether to replace the killed executors with new ones - * @param force whether to force kill busy executors - * @return whether the kill request is acknowledged. If list to kill is empty, it will return - * false. + * @param replace whether to replace the killed executors with new ones, default false + * @param force whether to force kill busy executors, default false + * @return the ids of the executors acknowledged by the cluster manager to be removed. */ - final def killExecutors( + final override def killExecutors( executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] = { @@ -600,6 +603,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful(false) + + /** + * Request that the cluster manager kill all executors on a given host. + * @return whether the kill request is acknowledged. + */ + final override def killExecutorsOnHost(host: String): Boolean = { + logInfo(s"Requesting to kill any and all executors on host ${host}") + // A potential race exists if a new executor attempts to register on a host + // that is on the blacklist and is no no longer valid. To avoid this race, + // all executor registration and killing happens in the event loop. This way, either + // an executor will fail to register, or will be killed when all executors on a host + // are killed. + // Kill all the executors on this host in an event loop to ensure serialization. + driverEndpoint.send(KillExecutorsOnHost(host)) + true + } } private[spark] object CoarseGrainedSchedulerBackend { |