aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala87
1 files changed, 87 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 33cc7588b9..73153d23c4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -298,6 +298,93 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
}
}
+ test("executors should be blacklisted after task failure, in spite of locality preferences") {
+ val rescheduleDelay = 300L
+ val conf = new SparkConf().
+ set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString).
+ // dont wait to jump locality levels in this test
+ set("spark.locality.wait", "0")
+
+ sc = new SparkContext("local", "test", conf)
+ // two executors on same host, one on different.
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+ ("exec1.1", "host1"), ("exec2", "host2"))
+ // affinity to exec1 on host1 - which we will fail.
+ val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, 4, clock)
+
+ {
+ val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
+ assert(offerResult.isDefined, "Expect resource offer to return a task")
+
+ assert(offerResult.get.index === 0)
+ assert(offerResult.get.executorId === "exec1")
+
+ // Cause exec1 to fail : failure 1
+ manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
+ assert(!sched.taskSetsFailed.contains(taskSet.id))
+
+ // Ensure scheduling on exec1 fails after failure 1 due to blacklist
+ assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty)
+ }
+
+ // Run the task on exec1.1 - should work, and then fail it on exec1.1
+ {
+ val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL)
+ assert(offerResult.isDefined,
+ "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
+
+ assert(offerResult.get.index === 0)
+ assert(offerResult.get.executorId === "exec1.1")
+
+ // Cause exec1.1 to fail : failure 2
+ manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
+ assert(!sched.taskSetsFailed.contains(taskSet.id))
+
+ // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
+ assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
+ }
+
+ // Run the task on exec2 - should work, and then fail it on exec2
+ {
+ val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY)
+ assert(offerResult.isDefined, "Expect resource offer to return a task")
+
+ assert(offerResult.get.index === 0)
+ assert(offerResult.get.executorId === "exec2")
+
+ // Cause exec2 to fail : failure 3
+ manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
+ assert(!sched.taskSetsFailed.contains(taskSet.id))
+
+ // Ensure scheduling on exec2 fails after failure 3 due to blacklist
+ assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty)
+ }
+
+ // After reschedule delay, scheduling on exec1 should be possible.
+ clock.advance(rescheduleDelay)
+
+ {
+ val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
+ assert(offerResult.isDefined, "Expect resource offer to return a task")
+
+ assert(offerResult.get.index === 0)
+ assert(offerResult.get.executorId === "exec1")
+
+ assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
+
+ // Cause exec1 to fail : failure 4
+ manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
+ }
+
+ // we have failed the same task 4 times now : task id should now be in taskSetsFailed
+ assert(sched.taskSetsFailed.contains(taskSet.id))
+ }
+
def createTaskResult(id: Int): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)