aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-06-27 16:28:59 -0500
committerImran Rashid <irashid@cloudera.com>2016-06-27 16:28:59 -0500
commit282158914d89b35a3f85388cb20bd62215f4f589 (patch)
tree69d068e1c479551f3c45034f7953fa1489b2e5d9 /core
parent1aa191e58e905f470f73663fc1c35f36e05e929a (diff)
downloadspark-282158914d89b35a3f85388cb20bd62215f4f589.tar.gz
spark-282158914d89b35a3f85388cb20bd62215f4f589.tar.bz2
spark-282158914d89b35a3f85388cb20bd62215f4f589.zip
[SPARK-16136][CORE] Fix flaky TaskManagerSuite
## What changes were proposed in this pull request? TaskManagerSuite "Kill other task attempts when one attempt belonging to the same task succeeds" was flaky. When checking whether a task is speculatable, at least one millisecond must pass since the task was submitted. Use a manual clock to avoid the problem. I noticed these tests were leaving lots of threads lying around as well (which prevented me from running the test repeatedly), so I fixed that too. ## How was this patch tested? Ran the test 1k times on my laptop, passed every time (it failed about 20% of the time before this). Author: Imran Rashid <irashid@cloudera.com> Closes #13848 from squito/fix_flaky_taskmanagersuite.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala68
1 files changed, 43 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 1d7c8f4a61..cfbabd8fb5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -157,14 +157,27 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
val MAX_TASK_FAILURES = 4
- override def beforeEach() {
+ var sched: FakeTaskScheduler = null
+
+ override def beforeEach(): Unit = {
super.beforeEach()
FakeRackUtil.cleanUp()
+ sched = null
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+ if (sched != null) {
+ sched.dagScheduler.stop()
+ sched.stop()
+ sched = null
+ }
}
+
test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -183,7 +196,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("multiple offers with no preferences") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
@@ -217,7 +230,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("skip unsatisfiable locality levels") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
+ sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -233,7 +246,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("basic delay scheduling") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host2", "exec2")),
@@ -263,7 +276,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("we do not need to delay scheduling when we only have noPref tasks in the queue") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host2", "exec3")),
@@ -280,7 +293,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("delay scheduling with fallback") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc,
+ sched = new FakeTaskScheduler(sc,
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(5,
Seq(TaskLocation("host1")),
@@ -320,7 +333,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("delay scheduling with failed hosts") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1")),
@@ -357,7 +370,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("task result lost") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -374,7 +387,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("repeated failures lead to task set abortion") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -404,7 +417,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sc = new SparkContext("local", "test", conf)
// two executors on same host, one on different.
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
("exec1.1", "host1"), ("exec2", "host2"))
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
@@ -486,7 +499,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Assign host2 to rack2
FakeRackUtil.assignHostToRack("host2", "rack2")
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc)
+ sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execB")),
@@ -518,7 +531,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Executors exit for reason unrelated to currently running tasks") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc)
+ sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execB")),
@@ -551,7 +564,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Assign host3 to rack2
FakeRackUtil.assignHostToRack("host3", "rack2")
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc,
+ sched = new FakeTaskScheduler(sc,
("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
@@ -574,7 +587,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("do not emit warning when serialized task is small") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
@@ -587,7 +600,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("emit warning when serialized task is large") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
@@ -601,7 +614,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Not serializable exception thrown if the task cannot be serialized") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
@@ -640,7 +653,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("speculative and noPref task should be scheduled after node-local") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(
+ sched = new FakeTaskScheduler(
sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "execA")),
@@ -668,7 +681,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("node-local tasks should be scheduled right away " +
"when there are only node-local and no-preference tasks") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(
+ sched = new FakeTaskScheduler(
sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1")),
@@ -691,7 +704,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished")
{
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
+ sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
@@ -712,7 +725,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
+ sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(3,
Seq(),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
@@ -733,7 +746,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Ensure TaskSetManager is usable after addition of levels") {
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc)
+ sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host2", "execB.1")))
@@ -765,7 +778,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Test that locations with HDFSCacheTaskLocation are treated as PROCESS_LOCAL.") {
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc,
+ sched = new FakeTaskScheduler(sc,
("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1")),
@@ -793,11 +806,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Kill other task attempts when one attempt belonging to the same task succeeds") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
@@ -819,6 +833,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(sched.endedTasks(id) === Success)
}
+ // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+ // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
+ // > 0ms, so advance the clock by 1ms here.
+ clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)