aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-08-05 23:02:58 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-05 23:02:58 -0700
commit63bdb1f41b4895e3a9444f7938094438a94d3007 (patch)
tree75aaa960879e75c0d51f55deffdefbe1217c41ce /core/src/test
parent5a826c00c3255a2d9e5eb17d6b1abf83f7c1a08d (diff)
downloadspark-63bdb1f41b4895e3a9444f7938094438a94d3007.tar.gz
spark-63bdb1f41b4895e3a9444f7938094438a94d3007.tar.bz2
spark-63bdb1f41b4895e3a9444f7938094438a94d3007.zip
SPARK-2294: fix locality inversion bug in TaskManager
copied from original JIRA (https://issues.apache.org/jira/browse/SPARK-2294): If an executor E is free, a task may be speculatively assigned to E when there are other tasks in the job that have not been launched (at all) yet. Similarly, a task without any locality preferences may be assigned to E when there was another NODE_LOCAL task that could have been scheduled. This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer (which in turn calls TaskSetManager.findTask) with increasing locality levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until the highest currently allowed level. Now, supposed NODE_LOCAL is the highest currently allowed locality level. The first time findTask is called, it will be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL tasks, it will try to schedule tasks with no locality preferences or speculative tasks. As a result, speculative tasks or tasks with no preferences may be scheduled instead of NODE_LOCAL tasks. ---- I added an additional parameter in resourceOffer and findTask, maxLocality, indicating when we should consider the tasks without locality preference Author: CodingCat <zhunansjtu@gmail.com> Closes #1313 from CodingCat/SPARK-2294 and squashes the following commits: bf3f13b [CodingCat] rollback some forgotten changes 89f9bc0 [CodingCat] address matei's comments 18cae02 [CodingCat] add test case for node-local tasks 2ba6195 [CodingCat] fix failed test cases 87dd09e [CodingCat] fix style 9b9432f [CodingCat] remove hasNodeLocalOnlyTasks fdd1573 [CodingCat] fix failed test cases 941a4fd [CodingCat] see my shocked face.......... f600085 [CodingCat] remove hasNodeLocalOnlyTasks checking 0b8a46b [CodingCat] test whether hasNodeLocalOnlyTasks affect the results 73ceda8 [CodingCat] style fix b3a430b [CodingCat] remove fine granularity tracking for node-local only tasks f9a2ad8 [CodingCat] simplify the logic in TaskSchedulerImpl c8c1de4 [CodingCat] simplify the patch be652ed [CodingCat] avoid unnecessary delay when we only have nopref tasks dee9e22 [CodingCat] fix locality inversion bug in TaskManager by moving nopref branch
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala205
1 files changed, 138 insertions, 67 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index c52368b551..ffd23380a8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -85,14 +85,31 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val finishedManagers = new ArrayBuffer[TaskSetManager]
val taskSetsFailed = new ArrayBuffer[String]
- val executors = new mutable.HashMap[String, String] ++ liveExecutors
+ val executors = new mutable.HashMap[String, String]
+ for ((execId, host) <- liveExecutors) {
+ addExecutor(execId, host)
+ }
+
for ((execId, host) <- liveExecutors; rack <- getRackForHost(host)) {
hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
}
dagScheduler = new FakeDAGScheduler(sc, this)
- def removeExecutor(execId: String): Unit = executors -= execId
+ def removeExecutor(execId: String) {
+ executors -= execId
+ val host = executorIdToHost.get(execId)
+ assert(host != None)
+ val hostId = host.get
+ val executorsOnHost = executorsByHost(hostId)
+ executorsOnHost -= execId
+ for (rack <- getRackForHost(hostId); hosts <- hostsByRack.get(rack)) {
+ hosts -= hostId
+ if (hosts.isEmpty) {
+ hostsByRack -= rack
+ }
+ }
+ }
override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
@@ -100,8 +117,15 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
+ override def hasHostAliveOnRack(rack: String): Boolean = {
+ hostsByRack.get(rack) != None
+ }
+
def addExecutor(execId: String, host: String) {
executors.put(execId, host)
+ val executorsOnHost = executorsByHost.getOrElseUpdate(host, new mutable.HashSet[String])
+ executorsOnHost += execId
+ executorIdToHost += execId -> host
for (rack <- getRackForHost(host)) {
hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
}
@@ -123,7 +147,7 @@ class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) {
}
class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
- import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
+ import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL}
private val conf = new SparkConf
@@ -134,18 +158,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
- // Offer a host with process-local as the constraint; this should work because the TaskSet
- // above won't have any locality preferences
- val taskOption = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
+ // Offer a host with NO_PREF as the constraint,
+ // we should get a nopref task immediately since that's what we only have
+ var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
- val task = taskOption.get
- assert(task.executorId === "exec1")
- assert(sched.startedTasks.contains(0))
-
- // Re-offer the host -- now we should get no more tasks
- assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
// Tell it the task has finished
manager.handleSuccessfulTask(0, createTaskResult(0))
@@ -161,7 +180,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// First three offers should all find tasks
for (i <- 0 until 3) {
- val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
+ var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
@@ -169,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.startedTasks.toSet === Set(0, 1, 2))
// Re-offer the host -- now we should get no more tasks
- assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
+ assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None)
// Finish the first two tasks
manager.handleSuccessfulTask(0, createTaskResult(0))
@@ -211,37 +230,40 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
)
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
-
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
-
- // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
- assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 3)
-
- // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
+ assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
clock.advance(LOCALITY_WAIT)
-
- // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
-
- // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
+ // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should
+ // get chosen before the noPref task
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
- // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None)
-
- // Offer host1, exec1 again, at ANY level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", ANY) === None)
+ // Offer host2, exec3 again, at NODE_LOCAL level: we should choose task 2
+ assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1)
+ // Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
+ // after failing to find a node_Local task
+ assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
clock.advance(LOCALITY_WAIT)
+ assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3)
+ }
- // Offer host1, exec1 again, at ANY level: task 1 should get chosen
- assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
-
- // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
- assert(manager.resourceOffer("exec1", "host1", ANY) === None)
+ test("we do not need to delay scheduling when we only have noPref tasks in the queue") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2"))
+ val taskSet = FakeTask.createTaskSet(3,
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host2", "exec3")),
+ Seq() // Last task has no locality prefs
+ )
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ // First offer host1, exec1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
+ assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1)
+ assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None)
+ assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2)
}
test("delay scheduling with fallback") {
@@ -298,20 +320,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
- // Offer host1 again: third task should be chosen immediately because host3 is not up
- assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
-
- // After this, nothing should get chosen
+ // After this, nothing should get chosen, because we have separated tasks with unavailable preference
+ // from the noPrefPendingTasks
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
// Now mark host2 as dead
sched.removeExecutor("exec2")
manager.executorLost("exec2", "host2")
- // Task 1 should immediately be launched on host1 because its original host is gone
+ // nothing should be chosen
+ assert(manager.resourceOffer("exec1", "host1", ANY) === None)
+
+ clock.advance(LOCALITY_WAIT * 2)
+
+ // task 1 and 2 would be scheduled as nonLocal task
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
- // Now that all tasks have launched, nothing new should be launched anywhere else
+ // all finished
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
}
@@ -373,7 +399,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, 4, clock)
{
- val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
+ val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
@@ -384,15 +410,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1 fails after failure 1 due to blacklist
- assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)
- assert(manager.resourceOffer("exec1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
- assert(manager.resourceOffer("exec1", "host1", TaskLocality.RACK_LOCAL).isEmpty)
- assert(manager.resourceOffer("exec1", "host1", TaskLocality.ANY).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
}
// Run the task on exec1.1 - should work, and then fail it on exec1.1
{
- val offerResult = manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL)
+ val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)
assert(offerResult.isDefined,
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
@@ -404,12 +430,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
- assert(manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL).isEmpty)
}
// Run the task on exec2 - should work, and then fail it on exec2
{
- val offerResult = manager.resourceOffer("exec2", "host2", TaskLocality.ANY)
+ val offerResult = manager.resourceOffer("exec2", "host2", ANY)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
@@ -420,20 +446,20 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec2 fails after failure 3 due to blacklist
- assert(manager.resourceOffer("exec2", "host2", TaskLocality.ANY).isEmpty)
+ assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
}
// After reschedule delay, scheduling on exec1 should be possible.
clock.advance(rescheduleDelay)
{
- val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
+ val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1")
- assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty)
// Cause exec1 to fail : failure 4
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
@@ -443,7 +469,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.taskSetsFailed.contains(taskSet.id))
}
- test("new executors get added") {
+ test("new executors get added and lost") {
// Assign host2 to rack2
FakeRackUtil.cleanUp()
FakeRackUtil.assignHostToRack("host2", "rack2")
@@ -456,26 +482,25 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq())
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
- // All tasks added to no-pref list since no preferred location is available
- assert(manager.pendingTasksWithNoPrefs.size === 4)
// Only ANY is valid
- assert(manager.myLocalityLevels.sameElements(Array(ANY)))
+ assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
// Add a new executor
sched.addExecutor("execD", "host1")
manager.executorAdded()
- // Task 0 and 1 should be removed from no-pref list
- assert(manager.pendingTasksWithNoPrefs.size === 2)
// Valid locality should contain NODE_LOCAL and ANY
- assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY)))
+ assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
// Add another executor
sched.addExecutor("execC", "host2")
manager.executorAdded()
- // No-pref list now only contains task 3
- assert(manager.pendingTasksWithNoPrefs.size === 1)
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
- assert(manager.myLocalityLevels.sameElements(
- Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
- FakeRackUtil.cleanUp()
+ assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
+ // test if the valid locality is recomputed when the executor is lost
+ sched.removeExecutor("execC")
+ manager.executorLost("execC", "host2")
+ assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
+ sched.removeExecutor("execD")
+ manager.executorLost("execD", "host1")
+ assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
}
test("test RACK_LOCAL tasks") {
@@ -506,7 +531,6 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host2
// Task 1 can be scheduled with RACK_LOCAL
assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1)
- FakeRackUtil.cleanUp()
}
test("do not emit warning when serialized task is small") {
@@ -536,6 +560,53 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.emittedTaskSizeWarning)
}
+ test("speculative and noPref task should be scheduled after node-local") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+ val taskSet = FakeTask.createTaskSet(4,
+ Seq(TaskLocation("host1", "execA")),
+ Seq(TaskLocation("host2"), TaskLocation("host1")),
+ Seq(),
+ Seq(TaskLocation("host3", "execC")))
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+ assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
+ assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
+ assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
+
+ manager.speculatableTasks += 1
+ clock.advance(LOCALITY_WAIT)
+ // schedule the nonPref task
+ assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
+ // schedule the speculative task
+ assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
+ clock.advance(LOCALITY_WAIT * 3)
+ // schedule non-local tasks
+ assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
+ }
+
+ test("node-local tasks should be scheduled right away when there are only node-local and no-preference tasks") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+ val taskSet = FakeTask.createTaskSet(4,
+ Seq(TaskLocation("host1")),
+ Seq(TaskLocation("host2")),
+ Seq(),
+ Seq(TaskLocation("host3")))
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+ // node-local tasks are scheduled without delay
+ assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
+ assert(manager.resourceOffer("execA", "host2", NODE_LOCAL).get.index === 1)
+ assert(manager.resourceOffer("execA", "host3", NODE_LOCAL).get.index === 3)
+ assert(manager.resourceOffer("execA", "host3", NODE_LOCAL) === None)
+
+ // schedule no-preference after node local ones
+ assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
+ }
+
def createTaskResult(id: Int): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)