aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala65
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala44
2 files changed, 101 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 97c22fe724..55024ecd55 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -506,13 +506,64 @@ private[spark] class TaskSetManager(
* Get the level we can launch tasks according to delay scheduling, based on current wait time.
*/
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
- while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
- currentLocalityIndex < myLocalityLevels.length - 1)
- {
- // Jump to the next locality level, and remove our waiting time for the current one since
- // we don't want to count it again on the next one
- lastLaunchTime += localityWaits(currentLocalityIndex)
- currentLocalityIndex += 1
+ // Remove the scheduled or finished tasks lazily
+ def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
+ var indexOffset = pendingTaskIds.size
+ while (indexOffset > 0) {
+ indexOffset -= 1
+ val index = pendingTaskIds(indexOffset)
+ if (copiesRunning(index) == 0 && !successful(index)) {
+ return true
+ } else {
+ pendingTaskIds.remove(indexOffset)
+ }
+ }
+ false
+ }
+ // Walk through the list of tasks that can be scheduled at each location and returns true
+ // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
+ // already been scheduled.
+ def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
+ val emptyKeys = new ArrayBuffer[String]
+ val hasTasks = pendingTasks.exists {
+ case (id: String, tasks: ArrayBuffer[Int]) =>
+ if (tasksNeedToBeScheduledFrom(tasks)) {
+ true
+ } else {
+ emptyKeys += id
+ false
+ }
+ }
+ // The key could be executorId, host or rackId
+ emptyKeys.foreach(id => pendingTasks.remove(id))
+ hasTasks
+ }
+
+ while (currentLocalityIndex < myLocalityLevels.length - 1) {
+ val moreTasks = myLocalityLevels(currentLocalityIndex) match {
+ case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
+ case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
+ case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
+ case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
+ }
+ if (!moreTasks) {
+ // This is a performance optimization: if there are no more tasks that can
+ // be scheduled at a particular locality level, there is no point in waiting
+ // for the locality wait timeout (SPARK-4939).
+ lastLaunchTime = curTime
+ logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
+ s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
+ currentLocalityIndex += 1
+ } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
+ // Jump to the next locality level, and reset lastLaunchTime so that the next locality
+ // wait timer doesn't immediately expire
+ lastLaunchTime += localityWaits(currentLocalityIndex)
+ currentLocalityIndex += 1
+ logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
+ s"${localityWaits(currentLocalityIndex)}ms")
+ } else {
+ return myLocalityLevels(currentLocalityIndex)
+ }
}
myLocalityLevels(currentLocalityIndex)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 84b9b78823..59580561cb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -314,7 +314,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
test("delay scheduling with failed hosts") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
+ ("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
@@ -649,6 +650,47 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
}
+ test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
+ val taskSet = FakeTask.createTaskSet(4,
+ Seq(TaskLocation("host1")),
+ Seq(TaskLocation("host2")),
+ Seq(ExecutorCacheTaskLocation("host1", "execA")),
+ Seq(ExecutorCacheTaskLocation("host2", "execB")))
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+ // process-local tasks are scheduled first
+ assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2)
+ assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3)
+ // node-local tasks are scheduled without delay
+ assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
+ assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1)
+ assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
+ assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None)
+ }
+
+ test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
+ val taskSet = FakeTask.createTaskSet(3,
+ Seq(),
+ Seq(ExecutorCacheTaskLocation("host1", "execA")),
+ Seq(ExecutorCacheTaskLocation("host2", "execB")))
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+ // process-local tasks are scheduled first
+ assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1)
+ assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2)
+ // no-pref tasks are scheduled without delay
+ assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None)
+ assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
+ assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0)
+ assert(manager.resourceOffer("execA", "host1", ANY) == None)
+ }
+
test("Ensure TaskSetManager is usable after addition of levels") {
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")