aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala22
1 files changed, 13 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index dc05e764c3..7d905538c6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -278,9 +278,6 @@ private[spark] class TaskSchedulerImpl(
}
}
}
- if (!launchedTask) {
- taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
- }
return launchedTask
}
@@ -326,12 +323,19 @@ private[spark] class TaskSchedulerImpl(
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
- var launchedTask = false
- for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
- do {
- launchedTask = resourceOfferSingleTaskSet(
- taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
- } while (launchedTask)
+ for (taskSet <- sortedTaskSets) {
+ var launchedAnyTask = false
+ var launchedTaskAtCurrentMaxLocality = false
+ for (currentMaxLocality <- taskSet.myLocalityLevels) {
+ do {
+ launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
+ taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
+ launchedAnyTask |= launchedTaskAtCurrentMaxLocality
+ } while (launchedTaskAtCurrentMaxLocality)
+ }
+ if (!launchedAnyTask) {
+ taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+ }
}
if (tasks.size > 0) {