diff options
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 6f1fd25764..59a618956a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -77,6 +77,10 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex override def isExecutorAlive(execId: String): Boolean = executors.contains(execId) override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host) + + def addExecutor(execId: String, host: String) { + executors.put(execId, host) + } } class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { @@ -400,6 +404,36 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(sched.taskSetsFailed.contains(taskSet.id)) } + test("new executors get added") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc) + val taskSet = FakeTask.createTaskSet(4, + Seq(TaskLocation("host1", "execA")), + Seq(TaskLocation("host1", "execB")), + Seq(TaskLocation("host2", "execC")), + Seq()) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) + // All tasks added to no-pref list since no preferred location is available + assert(manager.pendingTasksWithNoPrefs.size === 4) + // Only ANY is valid + assert(manager.myLocalityLevels.sameElements(Array(ANY))) + // Add a new executor + sched.addExecutor("execD", "host1") + manager.executorAdded() + // Task 0 and 1 should be removed from no-pref list + assert(manager.pendingTasksWithNoPrefs.size === 2) + // Valid locality should contain NODE_LOCAL and ANY + assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY))) + // Add another executor + sched.addExecutor("execC", "host2") + manager.executorAdded() + // No-pref list now only contains task 3 + assert(manager.pendingTasksWithNoPrefs.size === 1) + // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY + assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) + } + def createTaskResult(id: Int): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) |