aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorJosé Hiram Soltren <jose@cloudera.com>2017-02-09 12:49:31 -0600
committerImran Rashid <irashid@cloudera.com>2017-02-09 12:49:31 -0600
commit6287c94f08200d548df5cc0a401b73b84f9968c4 (patch)
treebd1e3eaf116c39d85584a203fbc84802b794e010 /core/src/main/scala
parentaf63c52fd36c59525d9504003b15142dc850fccb (diff)
downloadspark-6287c94f08200d548df5cc0a401b73b84f9968c4.tar.gz
spark-6287c94f08200d548df5cc0a401b73b84f9968c4.tar.bz2
spark-6287c94f08200d548df5cc0a401b73b84f9968c4.zip
[SPARK-16554][CORE] Automatically Kill Executors and Nodes when they are Blacklisted
## What changes were proposed in this pull request? In SPARK-8425, we introduced a mechanism for blacklisting executors and nodes (hosts). After a certain number of failures, these resources would be "blacklisted" and no further work would be assigned to them for some period of time. In some scenarios, it is better to fail fast, and to simply kill these unreliable resources. This changes proposes to do so by having the BlacklistTracker kill unreliable resources when they would otherwise be "blacklisted". In order to be thread safe, this code depends on the CoarseGrainedSchedulerBackend sending a message to the driver backend in order to do the actual killing. This also helps to prevent a race which would permit work to begin on a resource (executor or node), between the time the resource is marked for killing and the time at which it is finally killed. ## How was this patch tested? ./dev/run-tests Ran https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh, and checked logs to see executors and nodes being killed. Testing can likely be improved here; suggestions welcome. Author: José Hiram Soltren <jose@cloudera.com> Closes #16650 from jsoltren/SPARK-16554-submit.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala47
6 files changed, 94 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 5d47f624ac..e4b9f8111e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -54,11 +54,30 @@ private[spark] trait ExecutorAllocationClient {
/**
* Request that the cluster manager kill the specified executors.
+ *
+ * When asking the executor to be replaced, the executor loss is considered a failure, and
+ * killed tasks that are running on the executor will count towards the failure limits. If no
+ * replacement is being requested, then the tasks will not count towards the limit.
+ *
+ * @param executorIds identifiers of executors to kill
+ * @param replace whether to replace the killed executors with new ones, default false
+ * @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
- def killExecutors(executorIds: Seq[String]): Seq[String]
+ def killExecutors(
+ executorIds: Seq[String],
+ replace: Boolean = false,
+ force: Boolean = false): Seq[String]
/**
+ * Request that the cluster manager kill every executor on the specified host.
+ * Results in a call to killExecutors for each executor on the host, with the replace
+ * and force arguments set to true.
+ * @return whether the request is acknowledged by the cluster manager.
+ */
+ def killExecutorsOnHost(host: String): Boolean
+
+ /**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 536f493b41..223c921810 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -139,6 +139,11 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
+ private[spark] val BLACKLIST_KILL_ENABLED =
+ ConfigBuilder("spark.blacklist.killBlacklistedExecutors")
+ .booleanConf
+ .createWithDefault(false)
+
private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
.internal()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index 77d5c97a32..e130e609e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
private[scheduler] class BlacklistTracker (
private val listenerBus: LiveListenerBus,
conf: SparkConf,
+ allocationClient: Option[ExecutorAllocationClient],
clock: Clock = new SystemClock()) extends Logging {
- def this(sc: SparkContext) = {
- this(sc.listenerBus, sc.conf)
+ def this(sc: SparkContext, allocationClient: Option[ExecutorAllocationClient]) = {
+ this(sc.listenerBus, sc.conf, allocationClient)
}
BlacklistTracker.validateBlacklistConfs(conf)
@@ -173,6 +174,17 @@ private[scheduler] class BlacklistTracker (
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
+ if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+ allocationClient match {
+ case Some(allocationClient) =>
+ logInfo(s"Killing blacklisted executor id $exec " +
+ s"since spark.blacklist.killBlacklistedExecutors is set.")
+ allocationClient.killExecutors(Seq(exec), true, true)
+ case None =>
+ logWarning(s"Not attempting to kill blacklisted executor id $exec " +
+ s"since allocation client is not defined.")
+ }
+ }
// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
@@ -187,6 +199,19 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+ if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+ allocationClient match {
+ case Some(allocationClient) =>
+ logInfo(s"Killing all executors on blacklisted host $node " +
+ s"since spark.blacklist.killBlacklistedExecutors is set.")
+ if (allocationClient.killExecutorsOnHost(node) == false) {
+ logError(s"Killing executors on node $node failed.")
+ }
+ case None =>
+ logWarning(s"Not attempting to kill executors on blacklisted host $node " +
+ s"since allocation client is not defined.")
+ }
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8ce2ca32ed..bfbcfa1aa3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -726,7 +726,11 @@ private[spark] object TaskSchedulerImpl {
private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
- Some(new BlacklistTracker(sc))
+ val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match {
+ case b: ExecutorAllocationClient => Some(b)
+ case _ => None
+ }
+ Some(new BlacklistTracker(sc, executorAllocClient))
} else {
None
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 0280359809..2898cd7d17 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -43,6 +43,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
+ case class KillExecutorsOnHost(host: String)
+ extends CoarseGrainedClusterMessage
+
sealed trait RegisterExecutorResponse
case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 31575c0ca0..e006cc9656 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -140,6 +140,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
+
+ case KillExecutorsOnHost(host) =>
+ scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
+ killExecutors(exec.toSeq, replace = true, force = true)
+ }
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -148,6 +153,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
+ } else if (scheduler.nodeBlacklist != null &&
+ scheduler.nodeBlacklist.contains(hostname)) {
+ // If the cluster manager gives us an executor on a blacklisted node (because it
+ // already started allocating those resources before we informed it of our blacklist,
+ // or if it ignored our blacklist), then we reject that executor immediately.
+ logInfo(s"Rejecting $executorId as it has been blacklisted.")
+ executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
+ context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
@@ -524,27 +537,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Request that the cluster manager kill the specified executors.
- * @return whether the kill request is acknowledged. If list to kill is empty, it will return
- * false.
- */
- final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
- killExecutors(executorIds, replace = false, force = false)
- }
-
- /**
- * Request that the cluster manager kill the specified executors.
*
* When asking the executor to be replaced, the executor loss is considered a failure, and
* killed tasks that are running on the executor will count towards the failure limits. If no
* replacement is being requested, then the tasks will not count towards the limit.
*
* @param executorIds identifiers of executors to kill
- * @param replace whether to replace the killed executors with new ones
- * @param force whether to force kill busy executors
- * @return whether the kill request is acknowledged. If list to kill is empty, it will return
- * false.
+ * @param replace whether to replace the killed executors with new ones, default false
+ * @param force whether to force kill busy executors, default false
+ * @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
- final def killExecutors(
+ final override def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Seq[String] = {
@@ -600,6 +603,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
Future.successful(false)
+
+ /**
+ * Request that the cluster manager kill all executors on a given host.
+ * @return whether the kill request is acknowledged.
+ */
+ final override def killExecutorsOnHost(host: String): Boolean = {
+ logInfo(s"Requesting to kill any and all executors on host ${host}")
+ // A potential race exists if a new executor attempts to register on a host
+ // that is on the blacklist and is no no longer valid. To avoid this race,
+ // all executor registration and killing happens in the event loop. This way, either
+ // an executor will fail to register, or will be killed when all executors on a host
+ // are killed.
+ // Kill all the executors on this host in an event loop to ensure serialization.
+ driverEndpoint.send(KillExecutorsOnHost(host))
+ true
+ }
}
private[spark] object CoarseGrainedSchedulerBackend {