diff options
author | Imran Rashid <irashid@cloudera.com> | 2016-06-27 16:28:59 -0500 |
---|---|---|
committer | Imran Rashid <irashid@cloudera.com> | 2016-06-27 16:28:59 -0500 |
commit | 282158914d89b35a3f85388cb20bd62215f4f589 (patch) | |
tree | 69d068e1c479551f3c45034f7953fa1489b2e5d9 /core/src/test/scala | |
parent | 1aa191e58e905f470f73663fc1c35f36e05e929a (diff) | |
download | spark-282158914d89b35a3f85388cb20bd62215f4f589.tar.gz spark-282158914d89b35a3f85388cb20bd62215f4f589.tar.bz2 spark-282158914d89b35a3f85388cb20bd62215f4f589.zip |
[SPARK-16136][CORE] Fix flaky TaskManagerSuite
## What changes were proposed in this pull request?
TaskManagerSuite "Kill other task attempts when one attempt belonging to the same task succeeds" was flaky. When checking whether a task is speculatable, at least one millisecond must pass since the task was submitted. Use a manual clock to avoid the problem.
I noticed these tests were leaving lots of threads lying around as well (which prevented me from running the test repeatedly), so I fixed that too.
## How was this patch tested?
Ran the test 1k times on my laptop, passed every time (it failed about 20% of the time before this).
Author: Imran Rashid <irashid@cloudera.com>
Closes #13848 from squito/fix_flaky_taskmanagersuite.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 68 |
1 files changed, 43 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 1d7c8f4a61..cfbabd8fb5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -157,14 +157,27 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s") val MAX_TASK_FAILURES = 4 - override def beforeEach() { + var sched: FakeTaskScheduler = null + + override def beforeEach(): Unit = { super.beforeEach() FakeRackUtil.cleanUp() + sched = null + } + + override def afterEach(): Unit = { + super.afterEach() + if (sched != null) { + sched.dagScheduler.stop() + sched.stop() + sched = null + } } + test("TaskSet with no preferences") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) @@ -183,7 +196,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("multiple offers with no preferences") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(3) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => @@ -217,7 +230,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("skip unsatisfiable locality levels") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2")) + sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2")) val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB"))) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) @@ -233,7 +246,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("basic delay scheduling") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1", "exec1")), Seq(TaskLocation("host2", "exec2")), @@ -263,7 +276,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("we do not need to delay scheduling when we only have noPref tasks in the queue") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2")) val taskSet = FakeTask.createTaskSet(3, Seq(TaskLocation("host1", "exec1")), Seq(TaskLocation("host2", "exec3")), @@ -280,7 +293,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("delay scheduling with fallback") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) val taskSet = FakeTask.createTaskSet(5, Seq(TaskLocation("host1")), @@ -320,7 +333,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("delay scheduling with failed hosts") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) val taskSet = FakeTask.createTaskSet(3, Seq(TaskLocation("host1")), @@ -357,7 +370,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("task result lost") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) @@ -374,7 +387,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("repeated failures lead to task set abortion") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) @@ -404,7 +417,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sc = new SparkContext("local", "test", conf) // two executors on same host, one on different. - val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec1.1", "host1"), ("exec2", "host2")) // affinity to exec1 on host1 - which we will fail. val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1"))) @@ -486,7 +499,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Assign host2 to rack2 FakeRackUtil.assignHostToRack("host2", "rack2") sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc) + sched = new FakeTaskScheduler(sc) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1", "execA")), Seq(TaskLocation("host1", "execB")), @@ -518,7 +531,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Executors exit for reason unrelated to currently running tasks") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc) + sched = new FakeTaskScheduler(sc) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1", "execA")), Seq(TaskLocation("host1", "execB")), @@ -551,7 +564,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Assign host3 to rack2 FakeRackUtil.assignHostToRack("host3", "rack2") sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, + sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) val taskSet = FakeTask.createTaskSet(2, Seq(TaskLocation("host1", "execA")), @@ -574,7 +587,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("do not emit warning when serialized task is small") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) @@ -587,7 +600,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("emit warning when serialized task is large") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) @@ -601,7 +614,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Not serializable exception thrown if the task cannot be serialized") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) @@ -640,7 +653,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("speculative and noPref task should be scheduled after node-local") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler( + sched = new FakeTaskScheduler( sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1", "execA")), @@ -668,7 +681,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("node-local tasks should be scheduled right away " + "when there are only node-local and no-preference tasks") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler( + sched = new FakeTaskScheduler( sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1")), @@ -691,7 +704,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) + sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1")), Seq(TaskLocation("host2")), @@ -712,7 +725,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) + sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) val taskSet = FakeTask.createTaskSet(3, Seq(), Seq(ExecutorCacheTaskLocation("host1", "execA")), @@ -733,7 +746,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Ensure TaskSetManager is usable after addition of levels") { // Regression test for SPARK-2931 sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc) + sched = new FakeTaskScheduler(sc) val taskSet = FakeTask.createTaskSet(2, Seq(TaskLocation("host1", "execA")), Seq(TaskLocation("host2", "execB.1"))) @@ -765,7 +778,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Test that locations with HDFSCacheTaskLocation are treated as PROCESS_LOCAL.") { // Regression test for SPARK-2931 sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, + sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) val taskSet = FakeTask.createTaskSet(3, Seq(TaskLocation("host1")), @@ -793,11 +806,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Kill other task attempts when one attempt belonging to the same task succeeds") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately sc.conf.set("spark.speculation.multiplier", "0.0") - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => task.metrics.internalAccums } @@ -819,6 +833,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(sched.endedTasks(id) === Success) } + // checkSpeculatableTasks checks that the task runtime is greater than the threshold for + // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for + // > 0ms, so advance the clock by 1ms here. + clock.advance(1) assert(manager.checkSpeculatableTasks(0)) // Offer resource to start the speculative attempt for the running task val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) |