diff options
author | Davies Liu <davies@databricks.com> | 2015-02-04 14:22:07 -0800 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2015-02-04 14:22:07 -0800 |
commit | 0a89b156850fc5ba93160987927f249a7e633d51 (patch) | |
tree | b216e1c46fa5c8bd1e877ce8f23c4439dbc541e0 /core | |
parent | f0500f9fa378d81e4b4038a66a40eee15806b677 (diff) | |
download | spark-0a89b156850fc5ba93160987927f249a7e633d51.tar.gz spark-0a89b156850fc5ba93160987927f249a7e633d51.tar.bz2 spark-0a89b156850fc5ba93160987927f249a7e633d51.zip |
[SPARK-4939] move to next locality when no pending tasks
Currently, if there are different locality in a task set, the tasks with NODE_LOCAL only get scheduled after all the PROCESS_LOCAL tasks are scheduled and timeout with spark.locality.wait.process (3 seconds by default). In local mode, the LocalScheduler will never call resourceOffer() again once it failed to get a task with same locality, then all the NODE_LOCAL tasks will be never scheduled.
This bug could be reproduced by run example python/streaming/stateful_network_wordcount.py, it will hang after finished a batch with some data.
This patch will check whether there is task for current locality level, if not, it will change to next locality level without waiting for `spark.locality.wait.process` seconds. It works for all locality levels.
Because the list of pending tasks are updated lazily, the check can be false-positive, it means it will not move to next locality level even there is no valid pending tasks, it will wait for timeout.
Author: Davies Liu <davies@databricks.com>
Closes #3779 from davies/local_streaming and squashes the following commits:
2d25fb3 [Davies Liu] Update TaskSetManager.scala
1550668 [Davies Liu] add comment
1c37aac [Davies Liu] address comments
6b13824 [Davies Liu] address comments
906f456 [Davies Liu] Merge branch 'master' of github.com:apache/spark into local_streaming
414e79e [Davies Liu] fix bug, add logging
ff8eabb [Davies Liu] Merge branch 'master' into local_streaming
28d1b3c [Davies Liu] check tasks
9d0ceab [Davies Liu] Merge branch 'master' of github.com:apache/spark into local_streaming
37a2804 [Davies Liu] fix tests
49bda82 [Davies Liu] address comment
d8fb95a [Davies Liu] move to next locality level if no more tasks
2d6ae73 [Davies Liu] add comments
32d363f [Davies Liu] add regression test
7d8c5a5 [Davies Liu] jump to next locality if no pending tasks for executors
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 65 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 44 |
2 files changed, 101 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 97c22fe724..55024ecd55 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -506,13 +506,64 @@ private[spark] class TaskSetManager( * Get the level we can launch tasks according to delay scheduling, based on current wait time. */ private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = { - while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) && - currentLocalityIndex < myLocalityLevels.length - 1) - { - // Jump to the next locality level, and remove our waiting time for the current one since - // we don't want to count it again on the next one - lastLaunchTime += localityWaits(currentLocalityIndex) - currentLocalityIndex += 1 + // Remove the scheduled or finished tasks lazily + def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = { + var indexOffset = pendingTaskIds.size + while (indexOffset > 0) { + indexOffset -= 1 + val index = pendingTaskIds(indexOffset) + if (copiesRunning(index) == 0 && !successful(index)) { + return true + } else { + pendingTaskIds.remove(indexOffset) + } + } + false + } + // Walk through the list of tasks that can be scheduled at each location and returns true + // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have + // already been scheduled. + def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = { + val emptyKeys = new ArrayBuffer[String] + val hasTasks = pendingTasks.exists { + case (id: String, tasks: ArrayBuffer[Int]) => + if (tasksNeedToBeScheduledFrom(tasks)) { + true + } else { + emptyKeys += id + false + } + } + // The key could be executorId, host or rackId + emptyKeys.foreach(id => pendingTasks.remove(id)) + hasTasks + } + + while (currentLocalityIndex < myLocalityLevels.length - 1) { + val moreTasks = myLocalityLevels(currentLocalityIndex) match { + case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor) + case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost) + case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty + case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack) + } + if (!moreTasks) { + // This is a performance optimization: if there are no more tasks that can + // be scheduled at a particular locality level, there is no point in waiting + // for the locality wait timeout (SPARK-4939). + lastLaunchTime = curTime + logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") + currentLocalityIndex += 1 + } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { + // Jump to the next locality level, and reset lastLaunchTime so that the next locality + // wait timer doesn't immediately expire + lastLaunchTime += localityWaits(currentLocalityIndex) + currentLocalityIndex += 1 + logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " + + s"${localityWaits(currentLocalityIndex)}ms") + } else { + return myLocalityLevels(currentLocalityIndex) + } } myLocalityLevels(currentLocalityIndex) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 84b9b78823..59580561cb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -314,7 +314,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("delay scheduling with failed hosts") { sc = new SparkContext("local", "test") - val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), + ("exec3", "host3")) val taskSet = FakeTask.createTaskSet(3, Seq(TaskLocation("host1")), Seq(TaskLocation("host2")), @@ -649,6 +650,47 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2) } + test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) + val taskSet = FakeTask.createTaskSet(4, + Seq(TaskLocation("host1")), + Seq(TaskLocation("host2")), + Seq(ExecutorCacheTaskLocation("host1", "execA")), + Seq(ExecutorCacheTaskLocation("host2", "execB"))) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) + + // process-local tasks are scheduled first + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2) + assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3) + // node-local tasks are scheduled without delay + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) + assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) + assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None) + } + + test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) + val taskSet = FakeTask.createTaskSet(3, + Seq(), + Seq(ExecutorCacheTaskLocation("host1", "execA")), + Seq(ExecutorCacheTaskLocation("host2", "execB"))) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) + + // process-local tasks are scheduled first + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1) + assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2) + // no-pref tasks are scheduled without delay + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) + assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0) + assert(manager.resourceOffer("execA", "host1", ANY) == None) + } + test("Ensure TaskSetManager is usable after addition of levels") { // Regression test for SPARK-2931 sc = new SparkContext("local", "test") |