aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-11-28 13:47:09 -0600
committerImran Rashid <irashid@cloudera.com>2016-11-28 13:47:09 -0600
commit8b1609bebe489b2ef78db4be6e9836687089fe3d (patch)
tree0167378cf99adcd445398dbd0b7eccfc558b6f24
parentad67993b73490a24e7012df23810dab1712e7689 (diff)
downloadspark-8b1609bebe489b2ef78db4be6e9836687089fe3d.tar.gz
spark-8b1609bebe489b2ef78db4be6e9836687089fe3d.tar.bz2
spark-8b1609bebe489b2ef78db4be6e9836687089fe3d.zip
[SPARK-18117][CORE] Add test for TaskSetBlacklist
## What changes were proposed in this pull request? This adds tests to verify the interaction between TaskSetBlacklist and TaskSchedulerImpl. TaskSetBlacklist was introduced by SPARK-17675 but it neglected to add these tests. This change does not fix any bugs -- it is just for increasing test coverage. ## How was this patch tested? Jenkins Author: Imran Rashid <irashid@cloudera.com> Closes #15644 from squito/taskset_blacklist_test_update.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala254
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala45
3 files changed, 292 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b766e4148e..f2a432cad3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -84,7 +84,7 @@ private[spark] class TaskSetManager(
var totalResultSize = 0L
var calculatedTasks = 0
- private val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
+ private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
if (BlacklistTracker.isBlacklistEnabled(conf)) {
Some(new TaskSetBlacklist(conf, stageId, clock))
} else {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index f5f1947661..5dc7708530 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -17,7 +17,12 @@
package org.apache.spark.scheduler
+import scala.collection.mutable.HashMap
+
+import org.mockito.Matchers.{anyInt, anyString, eq => meq}
+import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when}
import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mock.MockitoSugar
import org.apache.spark._
import org.apache.spark.internal.config
@@ -31,7 +36,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
}
class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach
- with Logging {
+ with Logging with MockitoSugar {
var failedTaskSetException: Option[Throwable] = None
var failedTaskSetReason: String = null
@@ -40,11 +45,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
var taskScheduler: TaskSchedulerImpl = null
var dagScheduler: DAGScheduler = null
+ val stageToMockTaskSetBlacklist = new HashMap[Int, TaskSetBlacklist]()
+ val stageToMockTaskSetManager = new HashMap[Int, TaskSetManager]()
+
override def beforeEach(): Unit = {
super.beforeEach()
failedTaskSet = false
failedTaskSetException = None
failedTaskSetReason = null
+ stageToMockTaskSetBlacklist.clear()
+ stageToMockTaskSetManager.clear()
}
override def afterEach(): Unit = {
@@ -66,6 +76,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
sc = new SparkContext(conf)
taskScheduler = new TaskSchedulerImpl(sc)
+ setupHelper()
+ }
+
+ def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = {
+ val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
+ conf.set(config.BLACKLIST_ENABLED, true)
+ sc = new SparkContext(conf)
+ taskScheduler =
+ new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
+ override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
+ val tsm = super.createTaskSetManager(taskSet, maxFailures)
+ // we need to create a spied tsm just so we can set the TaskSetBlacklist
+ val tsmSpy = spy(tsm)
+ val taskSetBlacklist = mock[TaskSetBlacklist]
+ when(tsmSpy.taskSetBlacklistHelperOpt).thenReturn(Some(taskSetBlacklist))
+ stageToMockTaskSetManager(taskSet.stageId) = tsmSpy
+ stageToMockTaskSetBlacklist(taskSet.stageId) = taskSetBlacklist
+ tsmSpy
+ }
+ }
+ setupHelper()
+ }
+
+ def setupHelper(): TaskSchedulerImpl = {
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
dagScheduler = new DAGScheduler(sc, taskScheduler) {
@@ -282,6 +316,211 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(!failedTaskSet)
}
+ test("scheduled tasks obey task and stage blacklists") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ (0 to 2).foreach {stageId =>
+ val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId = stageId, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ }
+
+ // Setup our mock blacklist:
+ // * stage 0 is blacklisted on node "host1"
+ // * stage 1 is blacklisted on executor "executor3"
+ // * stage 0, partition 0 is blacklisted on executor 0
+ // (mocked methods default to returning false, ie. no blacklisting)
+ when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet("host1")).thenReturn(true)
+ when(stageToMockTaskSetBlacklist(1).isExecutorBlacklistedForTaskSet("executor3"))
+ .thenReturn(true)
+ when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", 0))
+ .thenReturn(true)
+
+ val offers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1),
+ new WorkerOffer("executor2", "host1", 1),
+ new WorkerOffer("executor3", "host2", 10)
+ )
+ val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+ // We should schedule all tasks.
+ assert(firstTaskAttempts.size === 6)
+ // Whenever we schedule a task, we must consult the node and executor blacklist. (The test
+ // doesn't check exactly what checks are made because the offers get shuffled.)
+ (0 to 2).foreach { stageId =>
+ verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+ .isNodeBlacklistedForTaskSet(anyString())
+ verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+ .isExecutorBlacklistedForTaskSet(anyString())
+ }
+
+ def tasksForStage(stageId: Int): Seq[TaskDescription] = {
+ firstTaskAttempts.filter{_.name.contains(s"stage $stageId")}
+ }
+ tasksForStage(0).foreach { task =>
+ // executors 1 & 2 blacklisted for node
+ // executor 0 blacklisted just for partition 0
+ if (task.index == 0) {
+ assert(task.executorId === "executor3")
+ } else {
+ assert(Set("executor0", "executor3").contains(task.executorId))
+ }
+ }
+ tasksForStage(1).foreach { task =>
+ // executor 3 blacklisted
+ assert("executor3" != task.executorId)
+ }
+ // no restrictions on stage 2
+
+ // Finally, just make sure that we can still complete tasks as usual with blacklisting
+ // in effect. Finish each of the tasksets -- taskset 0 & 1 complete successfully, taskset 2
+ // fails.
+ (0 to 2).foreach { stageId =>
+ val tasks = tasksForStage(stageId)
+ val tsm = taskScheduler.taskSetManagerForAttempt(stageId, 0).get
+ val valueSer = SparkEnv.get.serializer.newInstance()
+ if (stageId == 2) {
+ // Just need to make one task fail 4 times.
+ var task = tasks(0)
+ val taskIndex = task.index
+ (0 until 4).foreach { attempt =>
+ assert(task.attemptNumber === attempt)
+ tsm.handleFailedTask(task.taskId, TaskState.FAILED, TaskResultLost)
+ val nextAttempts =
+ taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor4", "host4", 1))).flatten
+ if (attempt < 3) {
+ assert(nextAttempts.size === 1)
+ task = nextAttempts(0)
+ assert(task.index === taskIndex)
+ } else {
+ assert(nextAttempts.size === 0)
+ }
+ }
+ // End the other task of the taskset, doesn't matter whether it succeeds or fails.
+ val otherTask = tasks(1)
+ val result = new DirectTaskResult[Int](valueSer.serialize(otherTask.taskId), Seq())
+ tsm.handleSuccessfulTask(otherTask.taskId, result)
+ } else {
+ tasks.foreach { task =>
+ val result = new DirectTaskResult[Int](valueSer.serialize(task.taskId), Seq())
+ tsm.handleSuccessfulTask(task.taskId, result)
+ }
+ }
+ assert(tsm.isZombie)
+ }
+ }
+
+ /**
+ * Helper for performance tests. Takes the explicitly blacklisted nodes and executors; verifies
+ * that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks).
+ * Creates 1 offer on executor[1-3]. Executor1 & 2 are on host1, executor3 is on host2. Passed
+ * in nodes and executors should be on that list.
+ */
+ private def testBlacklistPerformance(
+ testName: String,
+ nodeBlacklist: Seq[String],
+ execBlacklist: Seq[String]): Unit = {
+ // Because scheduling involves shuffling the order of offers around, we run this test a few
+ // times to cover more possibilities. There are only 3 offers, which means 6 permutations,
+ // so 10 iterations is pretty good.
+ (0 until 10).foreach { testItr =>
+ test(s"$testName: iteration $testItr") {
+ // When an executor or node is blacklisted, we want to make sure that we don't try
+ // scheduling each pending task, one by one, to discover they are all blacklisted. This is
+ // important for performance -- if we did check each task one-by-one, then responding to a
+ // resource offer (which is usually O(1)-ish) would become O(numPendingTasks), which would
+ // slow down scheduler throughput and slow down scheduling even on healthy executors.
+ // Here, we check a proxy for the runtime -- we make sure the scheduling is short-circuited
+ // at the node or executor blacklist, so we never check the per-task blacklist. We also
+ // make sure we don't check the node & executor blacklist for the entire taskset
+ // O(numPendingTasks) times.
+
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ // we schedule 500 tasks so we can clearly distinguish anything that is O(numPendingTasks)
+ val taskSet = FakeTask.createTaskSet(numTasks = 500, stageId = 0, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+
+ val offers = IndexedSeq(
+ new WorkerOffer("executor1", "host1", 1),
+ new WorkerOffer("executor2", "host1", 1),
+ new WorkerOffer("executor3", "host2", 1)
+ )
+ // We should check the node & exec blacklists, but only O(numOffers), not O(numPendingTasks)
+ // times. In the worst case, after shuffling, we offer our blacklisted resource first, and
+ // then offer other resources which do get used. The taskset blacklist is consulted
+ // repeatedly as we offer resources to the taskset -- each iteration either schedules
+ // something, or it terminates that locality level, so the maximum number of checks is
+ // numCores + numLocalityLevels
+ val numCoresOnAllOffers = offers.map(_.cores).sum
+ val numLocalityLevels = TaskLocality.values.size
+ val maxBlacklistChecks = numCoresOnAllOffers + numLocalityLevels
+
+ // Setup the blacklist
+ nodeBlacklist.foreach { node =>
+ when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet(node)).thenReturn(true)
+ }
+ execBlacklist.foreach { exec =>
+ when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTaskSet(exec))
+ .thenReturn(true)
+ }
+
+ // Figure out which nodes have any effective blacklisting on them. This means all nodes
+ // that are explicitly blacklisted, plus those that have *any* executors blacklisted.
+ val nodesForBlacklistedExecutors = offers.filter { offer =>
+ execBlacklist.contains(offer.executorId)
+ }.map(_.host).toSet.toSeq
+ val nodesWithAnyBlacklisting = (nodeBlacklist ++ nodesForBlacklistedExecutors).toSet
+ // Similarly, figure out which executors have any blacklisting. This means all executors
+ // that are explicitly blacklisted, plus all executors on nodes that are blacklisted.
+ val execsForBlacklistedNodes = offers.filter { offer =>
+ nodeBlacklist.contains(offer.host)
+ }.map(_.executorId).toSeq
+ val executorsWithAnyBlacklisting = (execBlacklist ++ execsForBlacklistedNodes).toSet
+
+ // Schedule a taskset, and make sure our test setup is correct -- we are able to schedule
+ // a task on all executors that aren't blacklisted (whether that executor is a explicitly
+ // blacklisted, or implicitly blacklisted via the node blacklist).
+ val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+ assert(firstTaskAttempts.size === offers.size - executorsWithAnyBlacklisting.size)
+
+ // Now check that we haven't made too many calls to any of the blacklist methods.
+ // We should be checking our node blacklist, but it should be within the bound we defined
+ // above.
+ verify(stageToMockTaskSetBlacklist(0), atMost(maxBlacklistChecks))
+ .isNodeBlacklistedForTaskSet(anyString())
+ // We shouldn't ever consult the per-task blacklist for the nodes that have been blacklisted
+ // for the entire taskset, since the taskset level blacklisting should prevent scheduling
+ // from ever looking at specific tasks.
+ nodesWithAnyBlacklisting.foreach { node =>
+ verify(stageToMockTaskSetBlacklist(0), never)
+ .isNodeBlacklistedForTask(meq(node), anyInt())
+ }
+ executorsWithAnyBlacklisting.foreach { exec =>
+ // We should be checking our executor blacklist, but it should be within the bound defined
+ // above. Its possible that this will be significantly fewer calls, maybe even 0, if
+ // there is also a node-blacklist which takes effect first. But this assert is all we
+ // need to avoid an O(numPendingTask) slowdown.
+ verify(stageToMockTaskSetBlacklist(0), atMost(maxBlacklistChecks))
+ .isExecutorBlacklistedForTaskSet(exec)
+ // We shouldn't ever consult the per-task blacklist for executors that have been
+ // blacklisted for the entire taskset, since the taskset level blacklisting should prevent
+ // scheduling from ever looking at specific tasks.
+ verify(stageToMockTaskSetBlacklist(0), never)
+ .isExecutorBlacklistedForTask(meq(exec), anyInt())
+ }
+ }
+ }
+ }
+
+ testBlacklistPerformance(
+ testName = "Blacklisted node for entire task set prevents per-task blacklist checks",
+ nodeBlacklist = Seq("host1"),
+ execBlacklist = Seq())
+
+ testBlacklistPerformance(
+ testName = "Blacklisted executor for entire task set prevents per-task blacklist checks",
+ nodeBlacklist = Seq(),
+ execBlacklist = Seq("executor3")
+ )
+
test("abort stage if executor loss results in unschedulability from previously failed tasks") {
// Make sure we can detect when a taskset becomes unschedulable from a blacklisting. This
// test explores a particular corner case -- you may have one task fail, but still be
@@ -301,27 +540,27 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
)).flatten
assert(Set("executor0", "executor1") === firstTaskAttempts.map(_.executorId).toSet)
- // fail one of the tasks, but leave the other running
+ // Fail one of the tasks, but leave the other running.
val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get
taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost)
- // at this point, our failed task could run on the other executor, so don't give up the task
+ // At this point, our failed task could run on the other executor, so don't give up the task
// set yet.
assert(!failedTaskSet)
// Now we fail our second executor. The other task can still run on executor1, so make an offer
- // on that executor, and make sure that the other task (not the failed one) is assigned there
+ // on that executor, and make sure that the other task (not the failed one) is assigned there.
taskScheduler.executorLost("executor1", SlaveLost("oops"))
val nextTaskAttempts =
taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten
// Note: Its OK if some future change makes this already realize the taskset has become
- // unschedulable at this point (though in the current implementation, we're sure it will not)
+ // unschedulable at this point (though in the current implementation, we're sure it will not).
assert(nextTaskAttempts.size === 1)
assert(nextTaskAttempts.head.executorId === "executor0")
assert(nextTaskAttempts.head.attemptNumber === 1)
assert(nextTaskAttempts.head.index != failedTask.index)
- // now we should definitely realize that our task set is unschedulable, because the only
- // task left can't be scheduled on any executors due to the blacklist
+ // Now we should definitely realize that our task set is unschedulable, because the only
+ // task left can't be scheduled on any executors due to the blacklist.
taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1)))
sc.listenerBus.waitUntilEmpty(100000)
assert(tsm.isZombie)
@@ -408,4 +647,5 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(thirdTaskDescs.size === 0)
assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
}
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 1b1a764cef..abc8fff30e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -22,11 +22,13 @@ import java.util.Random
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.mockito.Mockito.{mock, verify}
+import org.mockito.Matchers.{anyInt, anyString}
+import org.mockito.Mockito.{mock, never, spy, verify, when}
import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
+import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ManualClock}
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
@@ -992,6 +994,47 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager3.name === "TaskSet_1.1")
}
+ test("don't update blacklist for shuffle-fetch failures, preemption, denied commits, " +
+ "or killed tasks") {
+ // Setup a taskset, and fail some tasks for a fetch failure, preemption, denied commit,
+ // and killed task.
+ val conf = new SparkConf().
+ set(config.BLACKLIST_ENABLED, true)
+ sc = new SparkContext("local", "test", conf)
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = FakeTask.createTaskSet(4)
+ val tsm = new TaskSetManager(sched, taskSet, 4)
+ // we need a spy so we can attach our mock blacklist
+ val tsmSpy = spy(tsm)
+ val blacklist = mock(classOf[TaskSetBlacklist])
+ when(tsmSpy.taskSetBlacklistHelperOpt).thenReturn(Some(blacklist))
+
+ // make some offers to our taskset, to get tasks we will fail
+ val taskDescs = Seq(
+ "exec1" -> "host1",
+ "exec2" -> "host1"
+ ).flatMap { case (exec, host) =>
+ // offer each executor twice (simulating 2 cores per executor)
+ (0 until 2).flatMap{ _ => tsmSpy.resourceOffer(exec, host, TaskLocality.ANY)}
+ }
+ assert(taskDescs.size === 4)
+
+ // now fail those tasks
+ tsmSpy.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED,
+ FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 0, 0, "ignored"))
+ tsmSpy.handleFailedTask(taskDescs(1).taskId, TaskState.FAILED,
+ ExecutorLostFailure(taskDescs(1).executorId, exitCausedByApp = false, reason = None))
+ tsmSpy.handleFailedTask(taskDescs(2).taskId, TaskState.FAILED,
+ TaskCommitDenied(0, 2, 0))
+ tsmSpy.handleFailedTask(taskDescs(3).taskId, TaskState.KILLED,
+ TaskKilled)
+
+ // Make sure that the blacklist ignored all of the task failures above, since they aren't
+ // the fault of the executor where the task was running.
+ verify(blacklist, never())
+ .updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
+ }
+
private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {