aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-06-09 13:13:53 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2014-06-09 13:13:53 -0700
commit6cf335d79a2f69ecd9a139dd0a03acff60585be4 (patch)
treeb9fa6b466d66c6616ee866cbdf69a12387b69479 /core
parent0cf600280167a94faec75736223256e8f2e48085 (diff)
downloadspark-6cf335d79a2f69ecd9a139dd0a03acff60585be4.tar.gz
spark-6cf335d79a2f69ecd9a139dd0a03acff60585be4.tar.bz2
spark-6cf335d79a2f69ecd9a139dd0a03acff60585be4.zip
Added a TaskSetManager unit test.
This test ensures that when there are no alive executors that satisfy a particular locality level, the TaskSetManager doesn't ever use that as the maximum allowed locality level (this optimization ensures that a job doesn't wait extra time in an attempt to satisfy a scheduling locality level that is impossible). @mateiz and @lirui-intel this unit test illustrates an issue with #892 (it fails with that patch). Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #1024 from kayousterhout/scheduler_unit_test and squashes the following commits: de6a08f [Kay Ousterhout] Added a TaskSetManager unit test.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala16
1 files changed, 16 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index c92b6dc96c..6f1fd25764 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -141,6 +141,22 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.finishedManagers.contains(manager))
}
+ test("skip unsatisfiable locality levels") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
+ val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+ // An executor that is not NODE_LOCAL should be rejected.
+ assert(manager.resourceOffer("execC", "host2", ANY) === None)
+
+ // Because there are no alive PROCESS_LOCAL executors, the base locality level should be
+ // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before
+ // any of the locality wait timers expire.
+ assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0)
+ }
+
test("basic delay scheduling") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))