aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala47
6 files changed, 94 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 5d47f624ac..e4b9f8111e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -54,11 +54,30 @@ private[spark] trait ExecutorAllocationClient {
/**
* Request that the cluster manager kill the specified executors.
+ *
+ * When asking the executor to be replaced, the executor loss is considered a failure, and
+ * killed tasks that are running on the executor will count towards the failure limits. If no
+ * replacement is being requested, then the tasks will not count towards the limit.
+ *
+ * @param executorIds identifiers of executors to kill
+ * @param replace whether to replace the killed executors with new ones, default false
+ * @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
- def killExecutors(executorIds: Seq[String]): Seq[String]
+ def killExecutors(
+ executorIds: Seq[String],
+ replace: Boolean = false,
+ force: Boolean = false): Seq[String]
/**
+ * Request that the cluster manager kill every executor on the specified host.
+ * Results in a call to killExecutors for each executor on the host, with the replace
+ * and force arguments set to true.
+ * @return whether the request is acknowledged by the cluster manager.
+ */
+ def killExecutorsOnHost(host: String): Boolean
+
+ /**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 536f493b41..223c921810 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -139,6 +139,11 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
+ private[spark] val BLACKLIST_KILL_ENABLED =
+ ConfigBuilder("spark.blacklist.killBlacklistedExecutors")
+ .booleanConf
+ .createWithDefault(false)
+
private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
.internal()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index 77d5c97a32..e130e609e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
private[scheduler] class BlacklistTracker (
private val listenerBus: LiveListenerBus,
conf: SparkConf,
+ allocationClient: Option[ExecutorAllocationClient],
clock: Clock = new SystemClock()) extends Logging {
- def this(sc: SparkContext) = {
- this(sc.listenerBus, sc.conf)
+ def this(sc: SparkContext, allocationClient: Option[ExecutorAllocationClient]) = {
+ this(sc.listenerBus, sc.conf, allocationClient)
}
BlacklistTracker.validateBlacklistConfs(conf)
@@ -173,6 +174,17 @@ private[scheduler] class BlacklistTracker (
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
+ if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+ allocationClient match {
+ case Some(allocationClient) =>
+ logInfo(s"Killing blacklisted executor id $exec " +
+ s"since spark.blacklist.killBlacklistedExecutors is set.")
+ allocationClient.killExecutors(Seq(exec), true, true)
+ case None =>
+ logWarning(s"Not attempting to kill blacklisted executor id $exec " +
+ s"since allocation client is not defined.")
+ }
+ }
// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
@@ -187,6 +199,19 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+ if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+ allocationClient match {
+ case Some(allocationClient) =>
+ logInfo(s"Killing all executors on blacklisted host $node " +
+ s"since spark.blacklist.killBlacklistedExecutors is set.")
+ if (allocationClient.killExecutorsOnHost(node) == false) {
+ logError(s"Killing executors on node $node failed.")
+ }
+ case None =>
+ logWarning(s"Not attempting to kill executors on blacklisted host $node " +
+ s"since allocation client is not defined.")
+ }
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8ce2ca32ed..bfbcfa1aa3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -726,7 +726,11 @@ private[spark] object TaskSchedulerImpl {
private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
- Some(new BlacklistTracker(sc))
+ val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match {
+ case b: ExecutorAllocationClient => Some(b)
+ case _ => None
+ }
+ Some(new BlacklistTracker(sc, executorAllocClient))
} else {
None
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 0280359809..2898cd7d17 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -43,6 +43,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
+ case class KillExecutorsOnHost(host: String)
+ extends CoarseGrainedClusterMessage
+
sealed trait RegisterExecutorResponse
case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 31575c0ca0..e006cc9656 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -140,6 +140,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
+
+ case KillExecutorsOnHost(host) =>
+ scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
+ killExecutors(exec.toSeq, replace = true, force = true)
+ }
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -148,6 +153,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
+ } else if (scheduler.nodeBlacklist != null &&
+ scheduler.nodeBlacklist.contains(hostname)) {
+ // If the cluster manager gives us an executor on a blacklisted node (because it
+ // already started allocating those resources before we informed it of our blacklist,
+ // or if it ignored our blacklist), then we reject that executor immediately.
+ logInfo(s"Rejecting $executorId as it has been blacklisted.")
+ executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
+ context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
@@ -524,27 +537,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Request that the cluster manager kill the specified executors.
- * @return whether the kill request is acknowledged. If list to kill is empty, it will return
- * false.
- */
- final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
- killExecutors(executorIds, replace = false, force = false)
- }
-
- /**
- * Request that the cluster manager kill the specified executors.
*
* When asking the executor to be replaced, the executor loss is considered a failure, and
* killed tasks that are running on the executor will count towards the failure limits. If no
* replacement is being requested, then the tasks will not count towards the limit.
*
* @param executorIds identifiers of executors to kill
- * @param replace whether to replace the killed executors with new ones
- * @param force whether to force kill busy executors
- * @return whether the kill request is acknowledged. If list to kill is empty, it will return
- * false.
+ * @param replace whether to replace the killed executors with new ones, default false
+ * @param force whether to force kill busy executors, default false
+ * @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
- final def killExecutors(
+ final override def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Seq[String] = {
@@ -600,6 +603,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
Future.successful(false)
+
+ /**
+ * Request that the cluster manager kill all executors on a given host.
+ * @return whether the kill request is acknowledged.
+ */
+ final override def killExecutorsOnHost(host: String): Boolean = {
+ logInfo(s"Requesting to kill any and all executors on host ${host}")
+ // A potential race exists if a new executor attempts to register on a host
+ // that is on the blacklist and is no no longer valid. To avoid this race,
+ // all executor registration and killing happens in the event loop. This way, either
+ // an executor will fail to register, or will be killed when all executors on a host
+ // are killed.
+ // Kill all the executors on this host in an event loop to ensure serialization.
+ driverEndpoint.send(KillExecutorsOnHost(host))
+ true
+ }
}
private[spark] object CoarseGrainedSchedulerBackend {