aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala34
1 files changed, 34 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 6f1fd25764..59a618956a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -77,6 +77,10 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
+
+ def addExecutor(execId: String, host: String) {
+ executors.put(execId, host)
+ }
}
class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
@@ -400,6 +404,36 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.taskSetsFailed.contains(taskSet.id))
}
+ test("new executors get added") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc)
+ val taskSet = FakeTask.createTaskSet(4,
+ Seq(TaskLocation("host1", "execA")),
+ Seq(TaskLocation("host1", "execB")),
+ Seq(TaskLocation("host2", "execC")),
+ Seq())
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ // All tasks added to no-pref list since no preferred location is available
+ assert(manager.pendingTasksWithNoPrefs.size === 4)
+ // Only ANY is valid
+ assert(manager.myLocalityLevels.sameElements(Array(ANY)))
+ // Add a new executor
+ sched.addExecutor("execD", "host1")
+ manager.executorAdded()
+ // Task 0 and 1 should be removed from no-pref list
+ assert(manager.pendingTasksWithNoPrefs.size === 2)
+ // Valid locality should contain NODE_LOCAL and ANY
+ assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY)))
+ // Add another executor
+ sched.addExecutor("execC", "host2")
+ manager.executorAdded()
+ // No-pref list now only contains task 3
+ assert(manager.pendingTasksWithNoPrefs.size === 1)
+ // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
+ assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
+ }
+
def createTaskResult(id: Int): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)