diff options
author | Imran Rashid <irashid@cloudera.com> | 2017-02-06 22:37:37 -0800 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2017-02-06 22:37:37 -0800 |
commit | d9043092caf71d5fa6be18ae8c51a0158bc2218e (patch) | |
tree | 7ac6ca6874253c6495316a4898a8966872838d12 /core/src/main | |
parent | 7a0a630e0f699017c7d0214923cd4aa0227e62ff (diff) | |
download | spark-d9043092caf71d5fa6be18ae8c51a0158bc2218e.tar.gz spark-d9043092caf71d5fa6be18ae8c51a0158bc2218e.tar.bz2 spark-d9043092caf71d5fa6be18ae8c51a0158bc2218e.zip |
[SPARK-18967][SCHEDULER] compute locality levels even if delay = 0
## What changes were proposed in this pull request?
Before this change, with delay scheduling off, spark would effectively
ignore locality preferences for bulk scheduling. With this change,
locality preferences are used when multiple offers are made
simultaneously.
## How was this patch tested?
Test case added which fails without this change. All unit tests run via jenkins.
Author: Imran Rashid <irashid@cloudera.com>
Closes #16376 from squito/locality_without_delay.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 13 |
2 files changed, 19 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 72ed55af41..8ce2ca32ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, - blacklistTrackerOpt: Option[BlacklistTracker], + private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker], isLocal: Boolean = false) extends TaskScheduler with Logging { @@ -337,8 +337,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } }.getOrElse(offers) - // Randomly shuffle offers to avoid always placing tasks on the same set of workers. - val shuffledOffers = Random.shuffle(filteredOffers) + val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray @@ -375,6 +374,14 @@ private[spark] class TaskSchedulerImpl private[scheduler]( return tasks } + /** + * Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow + * overriding in tests, so it can be deterministic. + */ + protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + Random.shuffle(offers) + } + def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None var reason: Option[ExecutorLossReason] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 88251435a5..3b25513bea 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -163,7 +163,12 @@ private[spark] class TaskSetManager( addPendingTask(i) } - // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling + /** + * Track the set of locality levels which are valid given the tasks locality preferences and + * the set of currently available executors. This is updated as executors are added and removed. + * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip + * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors). + */ var myLocalityLevels = computeValidLocalityLevels() var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level @@ -961,18 +966,18 @@ private[spark] class TaskSetManager( private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] - if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 && + if (!pendingTasksForExecutor.isEmpty && pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } - if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 && + if (!pendingTasksForHost.isEmpty && pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } if (!pendingTasksWithNoPrefs.isEmpty) { levels += NO_PREF } - if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 && + if (!pendingTasksForRack.isEmpty && pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } |