aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2017-02-06 22:37:37 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-02-06 22:37:37 -0800
commitd9043092caf71d5fa6be18ae8c51a0158bc2218e (patch)
tree7ac6ca6874253c6495316a4898a8966872838d12 /core/src/main/scala
parent7a0a630e0f699017c7d0214923cd4aa0227e62ff (diff)
downloadspark-d9043092caf71d5fa6be18ae8c51a0158bc2218e.tar.gz
spark-d9043092caf71d5fa6be18ae8c51a0158bc2218e.tar.bz2
spark-d9043092caf71d5fa6be18ae8c51a0158bc2218e.zip
[SPARK-18967][SCHEDULER] compute locality levels even if delay = 0
## What changes were proposed in this pull request? Before this change, with delay scheduling off, spark would effectively ignore locality preferences for bulk scheduling. With this change, locality preferences are used when multiple offers are made simultaneously. ## How was this patch tested? Test case added which fails without this change. All unit tests run via jenkins. Author: Imran Rashid <irashid@cloudera.com> Closes #16376 from squito/locality_without_delay.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala13
2 files changed, 19 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 72ed55af41..8ce2ca32ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
private[spark] class TaskSchedulerImpl private[scheduler](
val sc: SparkContext,
val maxTaskFailures: Int,
- blacklistTrackerOpt: Option[BlacklistTracker],
+ private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
@@ -337,8 +337,7 @@ private[spark] class TaskSchedulerImpl private[scheduler](
}
}.getOrElse(offers)
- // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
- val shuffledOffers = Random.shuffle(filteredOffers)
+ val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
@@ -375,6 +374,14 @@ private[spark] class TaskSchedulerImpl private[scheduler](
return tasks
}
+ /**
+ * Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow
+ * overriding in tests, so it can be deterministic.
+ */
+ protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
+ Random.shuffle(offers)
+ }
+
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 88251435a5..3b25513bea 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -163,7 +163,12 @@ private[spark] class TaskSetManager(
addPendingTask(i)
}
- // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
+ /**
+ * Track the set of locality levels which are valid given the tasks locality preferences and
+ * the set of currently available executors. This is updated as executors are added and removed.
+ * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip
+ * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors).
+ */
var myLocalityLevels = computeValidLocalityLevels()
var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
@@ -961,18 +966,18 @@ private[spark] class TaskSetManager(
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
- if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
+ if (!pendingTasksForExecutor.isEmpty &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
- if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
+ if (!pendingTasksForHost.isEmpty &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
- if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
+ if (!pendingTasksForRack.isEmpty &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}