aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala10
2 files changed, 22 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4b6d6da5a6..be3673c48e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -88,6 +88,8 @@ private[spark] class TaskSchedulerImpl(
// in turn is used to decide when we can attain data locality on a given host
private val executorsByHost = new HashMap[String, HashSet[String]]
+ protected val hostsByRack = new HashMap[String, HashSet[String]]
+
private val executorIdToHost = new HashMap[String, String]
// Listener object to pass upcalls into
@@ -223,6 +225,9 @@ private[spark] class TaskSchedulerImpl(
executorAdded(o.executorId, o.host)
newExecAvail = true
}
+ for (rack <- getRackForHost(o.host)) {
+ hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
+ }
}
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
@@ -418,6 +423,12 @@ private[spark] class TaskSchedulerImpl(
execs -= executorId
if (execs.isEmpty) {
executorsByHost -= host
+ for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
+ hosts -= host
+ if (hosts.isEmpty) {
+ hostsByRack -= rack
+ }
+ }
}
executorIdToHost -= executorId
rootPool.executorLost(executorId, host)
@@ -435,6 +446,10 @@ private[spark] class TaskSchedulerImpl(
executorsByHost.contains(host)
}
+ def hasHostAliveOnRack(rack: String): Boolean = synchronized {
+ hostsByRack.contains(rack)
+ }
+
def isExecutorAlive(execId: String): Boolean = synchronized {
activeExecutorIds.contains(execId)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 059cc9085a..3bdc71d93b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -191,7 +191,9 @@ private[spark] class TaskSetManager(
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
- hadAliveLocations = true
+ if(sched.hasHostAliveOnRack(rack)){
+ hadAliveLocations = true
+ }
}
}
@@ -748,7 +750,8 @@ private[spark] class TaskSetManager(
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
- if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
+ if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
+ pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
levels += ANY
@@ -761,7 +764,8 @@ private[spark] class TaskSetManager(
def newLocAvail(index: Int): Boolean = {
for (loc <- tasks(index).preferredLocations) {
if (sched.hasExecutorsAliveOnHost(loc.host) ||
- sched.getRackForHost(loc.host).isDefined) {
+ (sched.getRackForHost(loc.host).isDefined &&
+ sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
return true
}
}