aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala47
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala62
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala79
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala2
10 files changed, 239 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 5d47f624ac..e4b9f8111e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -54,11 +54,30 @@ private[spark] trait ExecutorAllocationClient {
/**
* Request that the cluster manager kill the specified executors.
+ *
+ * When asking the executor to be replaced, the executor loss is considered a failure, and
+ * killed tasks that are running on the executor will count towards the failure limits. If no
+ * replacement is being requested, then the tasks will not count towards the limit.
+ *
+ * @param executorIds identifiers of executors to kill
+ * @param replace whether to replace the killed executors with new ones, default false
+ * @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
- def killExecutors(executorIds: Seq[String]): Seq[String]
+ def killExecutors(
+ executorIds: Seq[String],
+ replace: Boolean = false,
+ force: Boolean = false): Seq[String]
/**
+ * Request that the cluster manager kill every executor on the specified host.
+ * Results in a call to killExecutors for each executor on the host, with the replace
+ * and force arguments set to true.
+ * @return whether the request is acknowledged by the cluster manager.
+ */
+ def killExecutorsOnHost(host: String): Boolean
+
+ /**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 536f493b41..223c921810 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -139,6 +139,11 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
+ private[spark] val BLACKLIST_KILL_ENABLED =
+ ConfigBuilder("spark.blacklist.killBlacklistedExecutors")
+ .booleanConf
+ .createWithDefault(false)
+
private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
.internal()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index 77d5c97a32..e130e609e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
private[scheduler] class BlacklistTracker (
private val listenerBus: LiveListenerBus,
conf: SparkConf,
+ allocationClient: Option[ExecutorAllocationClient],
clock: Clock = new SystemClock()) extends Logging {
- def this(sc: SparkContext) = {
- this(sc.listenerBus, sc.conf)
+ def this(sc: SparkContext, allocationClient: Option[ExecutorAllocationClient]) = {
+ this(sc.listenerBus, sc.conf, allocationClient)
}
BlacklistTracker.validateBlacklistConfs(conf)
@@ -173,6 +174,17 @@ private[scheduler] class BlacklistTracker (
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
+ if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+ allocationClient match {
+ case Some(allocationClient) =>
+ logInfo(s"Killing blacklisted executor id $exec " +
+ s"since spark.blacklist.killBlacklistedExecutors is set.")
+ allocationClient.killExecutors(Seq(exec), true, true)
+ case None =>
+ logWarning(s"Not attempting to kill blacklisted executor id $exec " +
+ s"since allocation client is not defined.")
+ }
+ }
// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
@@ -187,6 +199,19 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+ if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+ allocationClient match {
+ case Some(allocationClient) =>
+ logInfo(s"Killing all executors on blacklisted host $node " +
+ s"since spark.blacklist.killBlacklistedExecutors is set.")
+ if (allocationClient.killExecutorsOnHost(node) == false) {
+ logError(s"Killing executors on node $node failed.")
+ }
+ case None =>
+ logWarning(s"Not attempting to kill executors on blacklisted host $node " +
+ s"since allocation client is not defined.")
+ }
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8ce2ca32ed..bfbcfa1aa3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -726,7 +726,11 @@ private[spark] object TaskSchedulerImpl {
private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
- Some(new BlacklistTracker(sc))
+ val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match {
+ case b: ExecutorAllocationClient => Some(b)
+ case _ => None
+ }
+ Some(new BlacklistTracker(sc, executorAllocClient))
} else {
None
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 0280359809..2898cd7d17 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -43,6 +43,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
+ case class KillExecutorsOnHost(host: String)
+ extends CoarseGrainedClusterMessage
+
sealed trait RegisterExecutorResponse
case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 31575c0ca0..e006cc9656 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -140,6 +140,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
+
+ case KillExecutorsOnHost(host) =>
+ scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
+ killExecutors(exec.toSeq, replace = true, force = true)
+ }
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -148,6 +153,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
+ } else if (scheduler.nodeBlacklist != null &&
+ scheduler.nodeBlacklist.contains(hostname)) {
+ // If the cluster manager gives us an executor on a blacklisted node (because it
+ // already started allocating those resources before we informed it of our blacklist,
+ // or if it ignored our blacklist), then we reject that executor immediately.
+ logInfo(s"Rejecting $executorId as it has been blacklisted.")
+ executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
+ context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
@@ -524,27 +537,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Request that the cluster manager kill the specified executors.
- * @return whether the kill request is acknowledged. If list to kill is empty, it will return
- * false.
- */
- final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
- killExecutors(executorIds, replace = false, force = false)
- }
-
- /**
- * Request that the cluster manager kill the specified executors.
*
* When asking the executor to be replaced, the executor loss is considered a failure, and
* killed tasks that are running on the executor will count towards the failure limits. If no
* replacement is being requested, then the tasks will not count towards the limit.
*
* @param executorIds identifiers of executors to kill
- * @param replace whether to replace the killed executors with new ones
- * @param force whether to force kill busy executors
- * @return whether the kill request is acknowledged. If list to kill is empty, it will return
- * false.
+ * @param replace whether to replace the killed executors with new ones, default false
+ * @param force whether to force kill busy executors, default false
+ * @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
- final def killExecutors(
+ final override def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Seq[String] = {
@@ -600,6 +603,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
Future.successful(false)
+
+ /**
+ * Request that the cluster manager kill all executors on a given host.
+ * @return whether the kill request is acknowledged.
+ */
+ final override def killExecutorsOnHost(host: String): Boolean = {
+ logInfo(s"Requesting to kill any and all executors on host ${host}")
+ // A potential race exists if a new executor attempts to register on a host
+ // that is on the blacklist and is no no longer valid. To avoid this race,
+ // all executor registration and killing happens in the event loop. This way, either
+ // an executor will fail to register, or will be killed when all executors on a host
+ // are killed.
+ // Kill all the executors on this host in an event loop to ensure serialization.
+ driverEndpoint.send(KillExecutorsOnHost(host))
+ true
+ }
}
private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index ec409712b9..4ea42fc7d5 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -1138,7 +1138,10 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
override def requestExecutors(numAdditionalExecutors: Int): Boolean =
sc.requestExecutors(numAdditionalExecutors)
- override def killExecutors(executorIds: Seq[String]): Seq[String] = {
+ override def killExecutors(
+ executorIds: Seq[String],
+ replace: Boolean,
+ force: Boolean): Seq[String] = {
val response = sc.killExecutors(executorIds)
if (response) {
executorIds
@@ -1154,4 +1157,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
override def reviveOffers(): Unit = sb.reviveOffers()
override def defaultParallelism(): Int = sb.defaultParallelism()
+
+ override def killExecutorsOnHost(host: String): Boolean = {
+ false
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 05dad7a4b8..54ea72737c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.deploy
import scala.collection.mutable
import scala.concurrent.duration._
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{mock, verify, when}
import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
@@ -29,10 +30,11 @@ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMaste
import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.internal.config
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster._
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RegisterExecutorFailed}
/**
* End-to-end tests for dynamic allocation in standalone mode.
@@ -467,6 +469,52 @@ class StandaloneDynamicAllocationSuite
}
}
+ test("kill all executors on localhost") {
+ sc = new SparkContext(appConf)
+ val appId = sc.applicationId
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
+ val beforeList = getApplications().head.executors.keys.toSet
+ assert(killExecutorsOnHost(sc, "localhost").equals(true))
+
+ syncExecutors(sc)
+ val afterList = getApplications().head.executors.keys.toSet
+
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(beforeList.intersect(afterList).size == 0)
+ }
+ }
+
+ test("executor registration on a blacklisted host must fail") {
+ sc = new SparkContext(appConf.set(config.BLACKLIST_ENABLED.key, "true"))
+ val endpointRef = mock(classOf[RpcEndpointRef])
+ val mockAddress = mock(classOf[RpcAddress])
+ when(endpointRef.address).thenReturn(mockAddress)
+ val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty)
+
+ // Get "localhost" on a blacklist.
+ val taskScheduler = mock(classOf[TaskSchedulerImpl])
+ when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
+ when(taskScheduler.sc).thenReturn(sc)
+ sc.taskScheduler = taskScheduler
+
+ // Create a fresh scheduler backend to blacklist "localhost".
+ sc.schedulerBackend.stop()
+ val backend =
+ new StandaloneSchedulerBackend(taskScheduler, sc, Array(masterRpcEnv.address.toSparkURL))
+ backend.start()
+
+ backend.driverEndpoint.ask[Boolean](message)
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ verify(endpointRef).send(RegisterExecutorFailed(any()))
+ }
+ }
+
// ===============================
// | Utility methods for testing |
// ===============================
@@ -528,6 +576,16 @@ class StandaloneDynamicAllocationSuite
}
}
+ /** Kill the executors on a given host. */
+ private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = {
+ syncExecutors(sc)
+ sc.schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ b.killExecutorsOnHost(host)
+ case _ => fail("expected coarse grained scheduler")
+ }
+ }
+
/**
* Return a list of executor IDs belonging to this application.
*
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index ead695574d..2b18ebee79 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -17,7 +17,10 @@
package org.apache.spark.scheduler
-import org.mockito.Mockito.{verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{never, verify, when}
+import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mock.MockitoSugar
@@ -43,7 +46,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
clock.setTime(0)
listenerBusMock = mock[LiveListenerBus]
- blacklist = new BlacklistTracker(listenerBusMock, conf, clock)
+ blacklist = new BlacklistTracker(listenerBusMock, conf, None, clock)
}
override def afterEach(): Unit = {
@@ -272,12 +275,14 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// if task failures are spaced out by more than the timeout period, the first failure is timed
// out, and the executor isn't blacklisted.
var stageId = 0
+
def failOneTaskInTaskSet(exec: String): Unit = {
val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
stageId += 1
}
+
failOneTaskInTaskSet(exec = "1")
// We have one sporadic failure on exec 2, but that's it. Later checks ensure that we never
// blacklist executor 2 despite this one failure.
@@ -411,7 +416,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// if you explicitly set the legacy conf to 0, that also would disable blacklisting
conf.set(config.BLACKLIST_LEGACY_TIMEOUT_CONF, 0L)
assert(!BlacklistTracker.isBlacklistEnabled(conf))
- // but again, the new conf takes precendence
+ // but again, the new conf takes precedence
conf.set(config.BLACKLIST_ENABLED, true)
assert(BlacklistTracker.isBlacklistEnabled(conf))
assert(1000 === BlacklistTracker.getBlacklistTimeout(conf))
@@ -456,4 +461,72 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
conf.remove(config)
}
}
+
+ test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
+ val allocationClientMock = mock[ExecutorAllocationClient]
+ when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called"))
+ when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
+ // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
+ // is updated before we ask the executor allocation client to kill all the executors
+ // on a particular host.
+ override def answer(invocation: InvocationOnMock): Boolean = {
+ if (blacklist.nodeBlacklist.contains("hostA") == false) {
+ throw new IllegalStateException("hostA should be on the blacklist")
+ }
+ true
+ }
+ })
+ blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
+
+ // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called.
+ conf.set(config.BLACKLIST_KILL_ENABLED, false)
+
+ val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+ // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
+ // application.
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
+
+ verify(allocationClientMock, never).killExecutor(any())
+
+ val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+ // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole
+ // application. Since that's the second executor that is blacklisted on the same node, we also
+ // blacklist that node.
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
+
+ verify(allocationClientMock, never).killExecutors(any(), any(), any())
+ verify(allocationClientMock, never).killExecutorsOnHost(any())
+
+ // Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
+ conf.set(config.BLACKLIST_KILL_ENABLED, true)
+ blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
+
+ val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
+ // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
+ // application.
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
+
+ verify(allocationClientMock).killExecutors(Seq("1"), true, true)
+
+ val taskSetBlacklist3 = createTaskSetBlacklist(stageId = 1)
+ // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole
+ // application. Since that's the second executor that is blacklisted on the same node, we also
+ // blacklist that node.
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist3.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures)
+
+ verify(allocationClientMock).killExecutors(Seq("2"), true, true)
+ verify(allocationClientMock).killExecutorsOnHost("hostA")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ffb9fe461a..d03a0c990a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -429,7 +429,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// We don't directly use the application blacklist, but its presence triggers blacklisting
// within the taskset.
val mockListenerBus = mock(classOf[LiveListenerBus])
- val blacklistTrackerOpt = Some(new BlacklistTracker(mockListenerBus, conf, clock))
+ val blacklistTrackerOpt = Some(new BlacklistTracker(mockListenerBus, conf, None, clock))
val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock)
{