aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-12-15 08:29:56 -0600
committerImran Rashid <irashid@cloudera.com>2016-12-15 08:29:56 -0600
commit93cdb8a7d0f124b4db069fd8242207c82e263c52 (patch)
treec0f626664bfa6bad965b85a3cc54438bf15b4332 /core/src/test/scala/org
parent7d858bc5ce870a28a559f4e81dcfc54cbd128cb7 (diff)
downloadspark-93cdb8a7d0f124b4db069fd8242207c82e263c52.tar.gz
spark-93cdb8a7d0f124b4db069fd8242207c82e263c52.tar.bz2
spark-93cdb8a7d0f124b4db069fd8242207c82e263c52.zip
[SPARK-8425][CORE] Application Level Blacklisting
## What changes were proposed in this pull request? This builds upon the blacklisting introduced in SPARK-17675 to add blacklisting of executors and nodes for an entire Spark application. Resources are blacklisted based on tasks that fail, in tasksets that eventually complete successfully; they are automatically returned to the pool of active resources based on a timeout. Full details are available in a design doc attached to the jira. ## How was this patch tested? Added unit tests, ran them via Jenkins, also ran a handful of them in a loop to check for flakiness. The added tests include: - verifying BlacklistTracker works correctly - verifying TaskSchedulerImpl interacts with BlacklistTracker correctly (via a mock BlacklistTracker) - an integration test for the entire scheduler with blacklisting in a few different scenarios Author: Imran Rashid <irashid@cloudera.com> Author: mwws <wei.mao@intel.com> Closes #14079 from squito/blacklist-SPARK-8425.
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala352
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala109
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala49
5 files changed, 487 insertions, 33 deletions
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 915d7a1b8b..7b6a2313f9 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -272,7 +272,7 @@ private class FakeSchedulerBackend(
protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
- RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
+ RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty[String]))
}
protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
@@ -291,7 +291,7 @@ private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoin
def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RequestExecutors(requestedTotal, _, _) =>
+ case RequestExecutors(requestedTotal, _, _, _) =>
targetNumExecutors = requestedTotal
context.reply(true)
case KillExecutors(executorIds) =>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index b2e7ec5df0..6b314d2ae3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -17,10 +17,356 @@
package org.apache.spark.scheduler
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark._
import org.apache.spark.internal.config
+import org.apache.spark.util.ManualClock
+
+class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar
+ with LocalSparkContext {
+
+ private val clock = new ManualClock(0)
+
+ private var blacklist: BlacklistTracker = _
+ private var scheduler: TaskSchedulerImpl = _
+ private var conf: SparkConf = _
+
+ override def beforeEach(): Unit = {
+ conf = new SparkConf().setAppName("test").setMaster("local")
+ .set(config.BLACKLIST_ENABLED.key, "true")
+ scheduler = mockTaskSchedWithConf(conf)
+
+ clock.setTime(0)
+ blacklist = new BlacklistTracker(conf, clock)
+ }
+
+ override def afterEach(): Unit = {
+ if (blacklist != null) {
+ blacklist = null
+ }
+ if (scheduler != null) {
+ scheduler.stop()
+ scheduler = null
+ }
+ super.afterEach()
+ }
+
+ // All executors and hosts used in tests should be in this set, so that [[assertEquivalentToSet]]
+ // works. Its OK if its got extraneous entries
+ val allExecutorAndHostIds = {
+ (('A' to 'Z')++ (1 to 100).map(_.toString))
+ .flatMap{ suffix =>
+ Seq(s"host$suffix", s"host-$suffix")
+ }
+ }.toSet
+
+ /**
+ * Its easier to write our tests as if we could directly look at the sets of nodes & executors in
+ * the blacklist. However the api doesn't expose a set, so this is a simple way to test
+ * something similar, since we know the universe of values that might appear in these sets.
+ */
+ def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = {
+ allExecutorAndHostIds.foreach { id =>
+ val actual = f(id)
+ val exp = expected.contains(id)
+ assert(actual === exp, raw"""for string "$id" """)
+ }
+ }
-class BlacklistTrackerSuite extends SparkFunSuite {
+ def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = {
+ sc = new SparkContext(conf)
+ val scheduler = mock[TaskSchedulerImpl]
+ when(scheduler.sc).thenReturn(sc)
+ when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker)
+ scheduler
+ }
+
+ def createTaskSetBlacklist(stageId: Int = 0): TaskSetBlacklist = {
+ new TaskSetBlacklist(conf, stageId, clock)
+ }
+
+ test("executors can be blacklisted with only a few failures per stage") {
+ // For many different stages, executor 1 fails a task, then executor 2 succeeds the task,
+ // and then the task set is done. Not enough failures to blacklist the executor *within*
+ // any particular taskset, but we still blacklist the executor overall eventually.
+ // Also, we intentionally have a mix of task successes and failures -- there are even some
+ // successes after the executor is blacklisted. The idea here is those tasks get scheduled
+ // before the executor is blacklisted. We might get successes after blacklisting (because the
+ // executor might be flaky but not totally broken). But successes should not unblacklist the
+ // executor.
+ val failuresUntilBlacklisted = conf.get(config.MAX_FAILURES_PER_EXEC)
+ var failuresSoFar = 0
+ (0 until failuresUntilBlacklisted * 10).foreach { stageId =>
+ val taskSetBlacklist = createTaskSetBlacklist(stageId)
+ if (stageId % 2 == 0) {
+ // fail one task in every other taskset
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ failuresSoFar += 1
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
+ assert(failuresSoFar == stageId / 2 + 1)
+ if (failuresSoFar < failuresUntilBlacklisted) {
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ } else {
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+ }
+ }
+ }
+
+ // If an executor has many task failures, but the task set ends up failing, it shouldn't be
+ // counted against the executor.
+ test("executors aren't blacklisted as a result of tasks in failed task sets") {
+ val failuresUntilBlacklisted = conf.get(config.MAX_FAILURES_PER_EXEC)
+ // for many different stages, executor 1 fails a task, and then the taskSet fails.
+ (0 until failuresUntilBlacklisted * 10).foreach { stage =>
+ val taskSetBlacklist = createTaskSetBlacklist(stage)
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ }
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ }
+
+ Seq(true, false).foreach { succeedTaskSet =>
+ val label = if (succeedTaskSet) "success" else "failure"
+ test(s"stage blacklist updates correctly on stage $label") {
+ // Within one taskset, an executor fails a few times, so it's blacklisted for the taskset.
+ // But if the taskset fails, we shouldn't blacklist the executor after the stage.
+ val taskSetBlacklist = createTaskSetBlacklist(0)
+ // We trigger enough failures for both the taskset blacklist, and the application blacklist.
+ val numFailures = math.max(conf.get(config.MAX_FAILURES_PER_EXEC),
+ conf.get(config.MAX_FAILURES_PER_EXEC_STAGE))
+ (0 until numFailures).foreach { index =>
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = index)
+ }
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ if (succeedTaskSet) {
+ // The task set succeeded elsewhere, so we should count those failures against our executor,
+ // and it should be blacklisted for the entire application.
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist.execToFailures)
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+ } else {
+ // The task set failed, so we don't count these failures against the executor for other
+ // stages.
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ }
+ }
+ }
+
+ test("blacklisted executors and nodes get recovered with time") {
+ val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+ // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
+ // application.
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
+ assert(blacklist.nodeBlacklist() === Set())
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+
+ val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+ // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole
+ // application. Since that's the second executor that is blacklisted on the same node, we also
+ // blacklist that node.
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
+ assert(blacklist.nodeBlacklist() === Set("hostA"))
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set("hostA"))
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
+
+ // Advance the clock and then make sure hostA and executors 1 and 2 have been removed from the
+ // blacklist.
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+ blacklist.applyBlacklistTimeout()
+ assert(blacklist.nodeBlacklist() === Set())
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+
+ // Fail one more task, but executor isn't put back into blacklist since the count of failures
+ // on that executor should have been reset to 0.
+ val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
+ taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures)
+ assert(blacklist.nodeBlacklist() === Set())
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ }
+
+ test("blacklist can handle lost executors") {
+ // The blacklist should still work if an executor is killed completely. We should still
+ // be able to blacklist the entire node.
+ val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+ // Lets say that executor 1 dies completely. We get some task failures, but
+ // the taskset then finishes successfully (elsewhere).
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+ }
+ blacklist.handleRemovedExecutor("1")
+ blacklist.updateBlacklistForSuccessfulTaskSet(
+ stageId = 0,
+ stageAttemptId = 0,
+ taskSetBlacklist0.execToFailures)
+ assert(blacklist.isExecutorBlacklisted("1"))
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS / 2)
+
+ // Now another executor gets spun up on that host, but it also dies.
+ val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+ }
+ blacklist.handleRemovedExecutor("2")
+ blacklist.updateBlacklistForSuccessfulTaskSet(
+ stageId = 1,
+ stageAttemptId = 0,
+ taskSetBlacklist1.execToFailures)
+ // We've now had two bad executors on the hostA, so we should blacklist the entire node.
+ assert(blacklist.isExecutorBlacklisted("1"))
+ assert(blacklist.isExecutorBlacklisted("2"))
+ assert(blacklist.isNodeBlacklisted("hostA"))
+
+ // Advance the clock so that executor 1 should no longer be explicitly blacklisted, but
+ // everything else should still be blacklisted.
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS / 2 + 1)
+ blacklist.applyBlacklistTimeout()
+ assert(!blacklist.isExecutorBlacklisted("1"))
+ assert(blacklist.isExecutorBlacklisted("2"))
+ assert(blacklist.isNodeBlacklisted("hostA"))
+ // make sure we don't leak memory
+ assert(!blacklist.executorIdToBlacklistStatus.contains("1"))
+ assert(!blacklist.nodeToBlacklistedExecs("hostA").contains("1"))
+ // Advance the timeout again so now hostA should be removed from the blacklist.
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS / 2)
+ blacklist.applyBlacklistTimeout()
+ assert(!blacklist.nodeIdToBlacklistExpiryTime.contains("hostA"))
+ }
+
+ test("task failures expire with time") {
+ // Verifies that 2 failures within the timeout period cause an executor to be blacklisted, but
+ // if task failures are spaced out by more than the timeout period, the first failure is timed
+ // out, and the executor isn't blacklisted.
+ var stageId = 0
+ def failOneTaskInTaskSet(exec: String): Unit = {
+ val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
+ taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
+ blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
+ stageId += 1
+ }
+ failOneTaskInTaskSet(exec = "1")
+ // We have one sporadic failure on exec 2, but that's it. Later checks ensure that we never
+ // blacklist executor 2 despite this one failure.
+ failOneTaskInTaskSet(exec = "2")
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ assert(blacklist.nextExpiryTime === Long.MaxValue)
+
+ // We advance the clock past the expiry time.
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+ val t0 = clock.getTimeMillis()
+ blacklist.applyBlacklistTimeout()
+ assert(blacklist.nextExpiryTime === Long.MaxValue)
+ failOneTaskInTaskSet(exec = "1")
+
+ // Because the 2nd failure on executor 1 happened past the expiry time, nothing should have been
+ // blacklisted.
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+
+ // Now we add one more failure, within the timeout, and it should be counted.
+ clock.setTime(t0 + blacklist.BLACKLIST_TIMEOUT_MILLIS - 1)
+ val t1 = clock.getTimeMillis()
+ failOneTaskInTaskSet(exec = "1")
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+ assert(blacklist.nextExpiryTime === t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+ // Add failures on executor 3, make sure it gets put on the blacklist.
+ clock.setTime(t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS - 1)
+ val t2 = clock.getTimeMillis()
+ failOneTaskInTaskSet(exec = "3")
+ failOneTaskInTaskSet(exec = "3")
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "3"))
+ assert(blacklist.nextExpiryTime === t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+ // Now we go past the timeout for executor 1, so it should be dropped from the blacklist.
+ clock.setTime(t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("3"))
+ assert(blacklist.nextExpiryTime === t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+ // Make sure that we update correctly when we go from having blacklisted executors to
+ // just having tasks with timeouts.
+ clock.setTime(t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS - 1)
+ failOneTaskInTaskSet(exec = "4")
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("3"))
+ assert(blacklist.nextExpiryTime === t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+ clock.setTime(t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ // we've got one task failure still, but we don't bother setting nextExpiryTime to it, to
+ // avoid wasting time checking for expiry of individual task failures.
+ assert(blacklist.nextExpiryTime === Long.MaxValue)
+ }
+
+ test("task failure timeout works as expected for long-running tasksets") {
+ // This ensures that we don't trigger spurious blacklisting for long tasksets, when the taskset
+ // finishes long after the task failures. We create two tasksets, each with one failure.
+ // Individually they shouldn't cause any blacklisting since there is only one failure.
+ // Furthermore, we space the failures out so far that even when both tasksets have completed,
+ // we still don't trigger any blacklisting.
+ val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+ val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
+ // Taskset1 has one failure immediately
+ taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0)
+ // Then we have a *long* delay, much longer than the timeout, before any other failures or
+ // taskset completion
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS * 5)
+ // After the long delay, we have one failure on taskset 2, on the same executor
+ taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0)
+ // Finally, we complete both tasksets. Its important here to complete taskset2 *first*. We
+ // want to make sure that when taskset 1 finishes, even though we've now got two task failures,
+ // we realize that the task failure we just added was well before the timeout.
+ clock.advance(1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(stageId = 2, 0, taskSetBlacklist2.execToFailures)
+ clock.advance(1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(stageId = 1, 0, taskSetBlacklist1.execToFailures)
+
+ // Make sure nothing was blacklisted
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ }
+
+ test("only blacklist nodes for the application when enough executors have failed on that " +
+ "specific host") {
+ // we blacklist executors on two different hosts -- make sure that doesn't lead to any
+ // node blacklisting
+ val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+ taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+
+ val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+ taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
+ taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(1, 0, taskSetBlacklist1.execToFailures)
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+
+ // Finally, blacklist another executor on the same node as the original blacklisted executor,
+ // and make sure this time we *do* blacklist the node.
+ val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
+ taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 0)
+ taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2", "3"))
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set("hostA"))
+ }
test("blacklist still respects legacy configs") {
val conf = new SparkConf().setMaster("local")
@@ -68,6 +414,8 @@ class BlacklistTrackerSuite extends SparkFunSuite {
config.MAX_TASK_ATTEMPTS_PER_NODE,
config.MAX_FAILURES_PER_EXEC_STAGE,
config.MAX_FAILED_EXEC_PER_NODE_STAGE,
+ config.MAX_FAILURES_PER_EXEC,
+ config.MAX_FAILED_EXEC_PER_NODE,
config.BLACKLIST_TIMEOUT_CONF
).foreach { config =>
conf.set(config.key, "0")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index a0b6268331..304dc9d47e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -21,14 +21,15 @@ import java.nio.ByteBuffer
import scala.collection.mutable.HashMap
-import org.mockito.Matchers.{anyInt, anyString, eq => meq}
-import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when}
+import org.mockito.Matchers.{anyInt, anyObject, anyString, eq => meq}
+import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mock.MockitoSugar
import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
+import org.apache.spark.storage.BlockManagerId
class FakeSchedulerBackend extends SchedulerBackend {
def start() {}
@@ -44,6 +45,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
var failedTaskSetReason: String = null
var failedTaskSet = false
+ var blacklist: BlacklistTracker = null
var taskScheduler: TaskSchedulerImpl = null
var dagScheduler: DAGScheduler = null
@@ -82,11 +84,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = {
+ blacklist = mock[BlacklistTracker]
val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
conf.set(config.BLACKLIST_ENABLED, true)
sc = new SparkContext(conf)
taskScheduler =
- new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
+ new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4), Some(blacklist)) {
override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
val tsm = super.createTaskSetManager(taskSet, maxFailures)
// we need to create a spied tsm just so we can set the TaskSetBlacklist
@@ -408,6 +411,95 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
assert(tsm.isZombie)
}
+
+ // the tasksSets complete, so the tracker should be notified of the successful ones
+ verify(blacklist, times(1)).updateBlacklistForSuccessfulTaskSet(
+ stageId = 0,
+ stageAttemptId = 0,
+ failuresByExec = stageToMockTaskSetBlacklist(0).execToFailures)
+ verify(blacklist, times(1)).updateBlacklistForSuccessfulTaskSet(
+ stageId = 1,
+ stageAttemptId = 0,
+ failuresByExec = stageToMockTaskSetBlacklist(1).execToFailures)
+ // but we shouldn't update for the failed taskset
+ verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(
+ stageId = meq(2),
+ stageAttemptId = anyInt(),
+ failuresByExec = anyObject())
+ }
+
+ test("scheduled tasks obey node and executor blacklists") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ (0 to 2).foreach { stageId =>
+ val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId = stageId, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ }
+
+ val offers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1),
+ new WorkerOffer("executor2", "host1", 1),
+ new WorkerOffer("executor3", "host2", 10),
+ new WorkerOffer("executor4", "host3", 1)
+ )
+
+ // setup our mock blacklist:
+ // host1, executor0 & executor3 are completely blacklisted
+ // This covers everything *except* one core on executor4 / host3, so that everything is still
+ // schedulable.
+ when(blacklist.isNodeBlacklisted("host1")).thenReturn(true)
+ when(blacklist.isExecutorBlacklisted("executor0")).thenReturn(true)
+ when(blacklist.isExecutorBlacklisted("executor3")).thenReturn(true)
+
+ val stageToTsm = (0 to 2).map { stageId =>
+ val tsm = taskScheduler.taskSetManagerForAttempt(stageId, 0).get
+ stageId -> tsm
+ }.toMap
+
+ val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+ firstTaskAttempts.foreach { task => logInfo(s"scheduled $task on ${task.executorId}") }
+ assert(firstTaskAttempts.size === 1)
+ assert(firstTaskAttempts.head.executorId === "executor4")
+ ('0' until '2').foreach { hostNum =>
+ verify(blacklist, atLeast(1)).isNodeBlacklisted("host" + hostNum)
+ }
+ }
+
+ test("abort stage when all executors are blacklisted") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ val taskSet = FakeTask.createTaskSet(numTasks = 10, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = stageToMockTaskSetManager(0)
+
+ // first just submit some offers so the scheduler knows about all the executors
+ taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 2),
+ WorkerOffer("executor1", "host0", 2),
+ WorkerOffer("executor2", "host0", 2),
+ WorkerOffer("executor3", "host1", 2)
+ ))
+
+ // now say our blacklist updates to blacklist a bunch of resources, but *not* everything
+ when(blacklist.isNodeBlacklisted("host1")).thenReturn(true)
+ when(blacklist.isExecutorBlacklisted("executor0")).thenReturn(true)
+
+ // make an offer on the blacklisted resources. We won't schedule anything, but also won't
+ // abort yet, since we know of other resources that work
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 2),
+ WorkerOffer("executor3", "host1", 2)
+ )).flatten.size === 0)
+ assert(!tsm.isZombie)
+
+ // now update the blacklist so that everything really is blacklisted
+ when(blacklist.isExecutorBlacklisted("executor1")).thenReturn(true)
+ when(blacklist.isExecutorBlacklisted("executor2")).thenReturn(true)
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 2),
+ WorkerOffer("executor3", "host1", 2)
+ )).flatten.size === 0)
+ assert(tsm.isZombie)
+ verify(tsm).abort(anyString(), anyObject())
}
/**
@@ -650,6 +742,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
}
+ test("scheduler checks for executors that can be expired from blacklist") {
+ taskScheduler = setupScheduler()
+
+ taskScheduler.submitTasks(FakeTask.createTaskSet(1, 0))
+ taskScheduler.resourceOffers(IndexedSeq(
+ new WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ verify(blacklist).applyBlacklistTimeout()
+ }
+
test("if an executor is lost then the state for its running tasks is cleaned up (SPARK-18553)") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
index 8c902af568..6b52c10b2c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
@@ -85,9 +85,9 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
Seq("exec1", "exec2").foreach { exec =>
assert(
- execToFailures(exec).taskToFailureCount === Map(
- 0 -> 1,
- 1 -> 1
+ execToFailures(exec).taskToFailureCountAndFailureTime === Map(
+ 0 -> (1, 0),
+ 1 -> (1, 0)
)
)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index abc8fff30e..2f5b029a96 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -183,7 +183,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdates = taskSet.tasks.head.metrics.internalAccums
// Offer a host with NO_PREF as the constraint,
@@ -236,7 +236,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// An executor that is not NODE_LOCAL should be rejected.
assert(manager.resourceOffer("execC", "host2", ANY) === None)
@@ -257,7 +257,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq() // Last task has no locality prefs
)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
@@ -286,7 +286,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq() // Last task has no locality prefs
)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1)
@@ -306,7 +306,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host2"))
)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -344,7 +344,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host3"))
)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -376,7 +376,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -393,7 +393,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
@@ -426,7 +426,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, 4, clock)
+ // We don't directly use the application blacklist, but its presence triggers blacklisting
+ // within the taskset.
+ val blacklistTrackerOpt = Some(new BlacklistTracker(conf, clock))
+ val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock)
{
val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
@@ -515,7 +518,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host2", "execC")),
Seq())
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
// Add a new executor
@@ -546,7 +549,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")),
Seq())
- val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock)
+ val manager = new TaskSetManager(sched, taskSet, 1, clock = new ManualClock)
sched.addExecutor("execA", "host1")
manager.executorAdded()
sched.addExecutor("execC", "host2")
@@ -579,7 +582,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execA")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
// Set allowed locality to ANY
@@ -670,7 +673,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(),
Seq(TaskLocation("host3", "execC")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
@@ -698,7 +701,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(),
Seq(TaskLocation("host3")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// node-local tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
@@ -720,7 +723,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2)
@@ -740,7 +743,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1)
@@ -760,7 +763,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host2", "execB.1")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
// Add a new executor
@@ -794,7 +797,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host2")),
Seq(TaskLocation("hdfs_cache_host3")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
sched.removeExecutor("execA")
manager.executorAdded()
@@ -822,7 +825,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
val clock = new ManualClock()
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
@@ -876,7 +879,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation.quantile", "0.6")
val clock = new ManualClock()
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
@@ -980,17 +983,17 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, new ManualClock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock)
assert(manager.name === "TaskSet_0.0")
// Make sure a task set with the same stage ID but different attempt ID has a unique name
val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 1)
- val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, new ManualClock)
+ val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = new ManualClock)
assert(manager2.name === "TaskSet_0.1")
// Make sure a task set with the same attempt ID but different stage ID also has a unique name
val taskSet3 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 1)
- val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, new ManualClock)
+ val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, clock = new ManualClock)
assert(manager3.name === "TaskSet_1.1")
}