diff options
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 13 |
2 files changed, 19 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 72ed55af41..8ce2ca32ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, - blacklistTrackerOpt: Option[BlacklistTracker], + private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker], isLocal: Boolean = false) extends TaskScheduler with Logging { @@ -337,8 +337,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } }.getOrElse(offers) - // Randomly shuffle offers to avoid always placing tasks on the same set of workers. - val shuffledOffers = Random.shuffle(filteredOffers) + val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray @@ -375,6 +374,14 @@ private[spark] class TaskSchedulerImpl private[scheduler]( return tasks } + /** + * Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow + * overriding in tests, so it can be deterministic. + */ + protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + Random.shuffle(offers) + } + def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None var reason: Option[ExecutorLossReason] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 88251435a5..3b25513bea 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -163,7 +163,12 @@ private[spark] class TaskSetManager( addPendingTask(i) } - // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling + /** + * Track the set of locality levels which are valid given the tasks locality preferences and + * the set of currently available executors. This is updated as executors are added and removed. + * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip + * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors). + */ var myLocalityLevels = computeValidLocalityLevels() var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level @@ -961,18 +966,18 @@ private[spark] class TaskSetManager( private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] - if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 && + if (!pendingTasksForExecutor.isEmpty && pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } - if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 && + if (!pendingTasksForHost.isEmpty && pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } if (!pendingTasksWithNoPrefs.isEmpty) { levels += NO_PREF } - if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 && + if (!pendingTasksForRack.isEmpty && pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } |