diff options
3 files changed, 105 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 72ed55af41..8ce2ca32ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, - blacklistTrackerOpt: Option[BlacklistTracker], + private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker], isLocal: Boolean = false) extends TaskScheduler with Logging { @@ -337,8 +337,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } }.getOrElse(offers) - // Randomly shuffle offers to avoid always placing tasks on the same set of workers. - val shuffledOffers = Random.shuffle(filteredOffers) + val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray @@ -375,6 +374,14 @@ private[spark] class TaskSchedulerImpl private[scheduler]( return tasks } + /** + * Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow + * overriding in tests, so it can be deterministic. + */ + protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + Random.shuffle(offers) + } + def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None var reason: Option[ExecutorLossReason] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 88251435a5..3b25513bea 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -163,7 +163,12 @@ private[spark] class TaskSetManager( addPendingTask(i) } - // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling + /** + * Track the set of locality levels which are valid given the tasks locality preferences and + * the set of currently available executors. This is updated as executors are added and removed. + * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip + * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors). + */ var myLocalityLevels = computeValidLocalityLevels() var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level @@ -961,18 +966,18 @@ private[spark] class TaskSetManager( private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] - if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 && + if (!pendingTasksForExecutor.isEmpty && pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } - if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 && + if (!pendingTasksForHost.isEmpty && pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } if (!pendingTasksWithNoPrefs.isEmpty) { levels += NO_PREF } - if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 && + if (!pendingTasksForRack.isEmpty && pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 304dc9d47e..9ae0bcd9b8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.mock.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.internal.Logging -import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.ManualClock class FakeSchedulerBackend extends SchedulerBackend { def start() {} @@ -819,4 +819,89 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!taskScheduler.hasExecutorsAliveOnHost("host0")) assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty) } + + test("Locality should be used for bulk offers even with delay scheduling off") { + val conf = new SparkConf() + .set("spark.locality.wait", "0") + sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) + // we create a manual clock just so we can be sure the clock doesn't advance at all in this test + val clock = new ManualClock() + + // We customize the task scheduler just to let us control the way offers are shuffled, so we + // can be sure we try both permutations, and to control the clock on the tasksetmanager. + val taskScheduler = new TaskSchedulerImpl(sc) { + override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + // Don't shuffle the offers around for this test. Instead, we'll just pass in all + // the permutations we care about directly. + offers + } + override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + } + } + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + taskScheduler.initialize(new FakeSchedulerBackend) + + // Make two different offers -- one in the preferred location, one that is not. + val offers = IndexedSeq( + WorkerOffer("exec1", "host1", 1), + WorkerOffer("exec2", "host2", 1) + ) + Seq(false, true).foreach { swapOrder => + // Submit a taskset with locality preferences. + val taskSet = FakeTask.createTaskSet( + 1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", "exec1"))) + taskScheduler.submitTasks(taskSet) + val shuffledOffers = if (swapOrder) offers.reverse else offers + // Regardless of the order of the offers (after the task scheduler shuffles them), we should + // always take advantage of the local offer. + val taskDescs = taskScheduler.resourceOffers(shuffledOffers).flatten + withClue(s"swapOrder = $swapOrder") { + assert(taskDescs.size === 1) + assert(taskDescs.head.executorId === "exec1") + } + } + } + + test("With delay scheduling off, tasks can be run at any locality level immediately") { + val conf = new SparkConf() + .set("spark.locality.wait", "0") + sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) + + // we create a manual clock just so we can be sure the clock doesn't advance at all in this test + val clock = new ManualClock() + val taskScheduler = new TaskSchedulerImpl(sc) { + override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + } + } + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + taskScheduler.initialize(new FakeSchedulerBackend) + // make an offer on the preferred host so the scheduler knows its alive. This is necessary + // so that the taskset knows that it *could* take advantage of locality. + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1))) + + // Submit a taskset with locality preferences. + val taskSet = FakeTask.createTaskSet( + 1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", "exec1"))) + taskScheduler.submitTasks(taskSet) + val tsm = taskScheduler.taskSetManagerForAttempt(1, 0).get + // make sure we've setup our test correctly, so that the taskset knows it *could* use local + // offers. + assert(tsm.myLocalityLevels.contains(TaskLocality.NODE_LOCAL)) + // make an offer on a non-preferred location. Since the delay is 0, we should still schedule + // immediately. + val taskDescs = + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec2", "host2", 1))).flatten + assert(taskDescs.size === 1) + assert(taskDescs.head.executorId === "exec2") + } } |