aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRui Li <rui.li@intel.com>2014-07-16 22:53:37 +0530
committermridulm <mridulm80@apache.org>2014-07-16 22:53:37 +0530
commit33e64ecacbc44567f9cba2644a30a118653ea5fa (patch)
tree1d790127895be0f28b03c6e243bfa5dcf2418329 /core
parentefc452a16322e8b20b3c4fe1d6847315f928cd2d (diff)
downloadspark-33e64ecacbc44567f9cba2644a30a118653ea5fa.tar.gz
spark-33e64ecacbc44567f9cba2644a30a118653ea5fa.tar.bz2
spark-33e64ecacbc44567f9cba2644a30a118653ea5fa.zip
SPARK-2277: make TaskScheduler track hosts on rack
Hi mateiz, I've created [SPARK-2277](https://issues.apache.org/jira/browse/SPARK-2277) to make TaskScheduler track hosts on each rack. Please help to review, thanks. Author: Rui Li <rui.li@intel.com> Closes #1212 from lirui-intel/trackHostOnRack and squashes the following commits: 2b4bd0f [Rui Li] SPARK-2277: refine UT fbde838 [Rui Li] SPARK-2277: add UT 7bbe658 [Rui Li] SPARK-2277: rename the method 5e4ef62 [Rui Li] SPARK-2277: remove unnecessary import 79ac750 [Rui Li] SPARK-2277: make TaskScheduler track hosts on rack
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala63
3 files changed, 83 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4b6d6da5a6..be3673c48e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -88,6 +88,8 @@ private[spark] class TaskSchedulerImpl(
// in turn is used to decide when we can attain data locality on a given host
private val executorsByHost = new HashMap[String, HashSet[String]]
+ protected val hostsByRack = new HashMap[String, HashSet[String]]
+
private val executorIdToHost = new HashMap[String, String]
// Listener object to pass upcalls into
@@ -223,6 +225,9 @@ private[spark] class TaskSchedulerImpl(
executorAdded(o.executorId, o.host)
newExecAvail = true
}
+ for (rack <- getRackForHost(o.host)) {
+ hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
+ }
}
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
@@ -418,6 +423,12 @@ private[spark] class TaskSchedulerImpl(
execs -= executorId
if (execs.isEmpty) {
executorsByHost -= host
+ for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
+ hosts -= host
+ if (hosts.isEmpty) {
+ hostsByRack -= rack
+ }
+ }
}
executorIdToHost -= executorId
rootPool.executorLost(executorId, host)
@@ -435,6 +446,10 @@ private[spark] class TaskSchedulerImpl(
executorsByHost.contains(host)
}
+ def hasHostAliveOnRack(rack: String): Boolean = synchronized {
+ hostsByRack.contains(rack)
+ }
+
def isExecutorAlive(execId: String): Boolean = synchronized {
activeExecutorIds.contains(execId)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 059cc9085a..3bdc71d93b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -191,7 +191,9 @@ private[spark] class TaskSetManager(
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
- hadAliveLocations = true
+ if(sched.hasHostAliveOnRack(rack)){
+ hadAliveLocations = true
+ }
}
}
@@ -748,7 +750,8 @@ private[spark] class TaskSetManager(
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
- if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
+ if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
+ pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
levels += ANY
@@ -761,7 +764,8 @@ private[spark] class TaskSetManager(
def newLocAvail(index: Int): Boolean = {
for (loc <- tasks(index).preferredLocations) {
if (sched.hasExecutorsAliveOnHost(loc.host) ||
- sched.getRackForHost(loc.host).isDefined) {
+ (sched.getRackForHost(loc.host).isDefined &&
+ sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
return true
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 9ff2a48700..86b443b18f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -54,6 +54,23 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
}
}
+// Get the rack for a given host
+object FakeRackUtil {
+ private val hostToRack = new mutable.HashMap[String, String]()
+
+ def cleanUp() {
+ hostToRack.clear()
+ }
+
+ def assignHostToRack(host: String, rack: String) {
+ hostToRack(host) = rack
+ }
+
+ def getRackForHost(host: String) = {
+ hostToRack.get(host)
+ }
+}
+
/**
* A mock TaskSchedulerImpl implementation that just remembers information about tasks started and
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
@@ -69,6 +86,9 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val taskSetsFailed = new ArrayBuffer[String]
val executors = new mutable.HashMap[String, String] ++ liveExecutors
+ for ((execId, host) <- liveExecutors; rack <- getRackForHost(host)) {
+ hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
+ }
dagScheduler = new FakeDAGScheduler(sc, this)
@@ -82,7 +102,12 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
def addExecutor(execId: String, host: String) {
executors.put(execId, host)
+ for (rack <- getRackForHost(host)) {
+ hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
+ }
}
+
+ override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value)
}
/**
@@ -419,6 +444,9 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
}
test("new executors get added") {
+ // Assign host2 to rack2
+ FakeRackUtil.cleanUp()
+ FakeRackUtil.assignHostToRack("host2", "rack2")
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4,
@@ -444,8 +472,39 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
manager.executorAdded()
// No-pref list now only contains task 3
assert(manager.pendingTasksWithNoPrefs.size === 1)
- // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
- assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
+ // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
+ assert(manager.myLocalityLevels.sameElements(
+ Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
+ }
+
+ test("test RACK_LOCAL tasks") {
+ FakeRackUtil.cleanUp()
+ // Assign host1 to rack1
+ FakeRackUtil.assignHostToRack("host1", "rack1")
+ // Assign host2 to rack1
+ FakeRackUtil.assignHostToRack("host2", "rack1")
+ // Assign host3 to rack2
+ FakeRackUtil.assignHostToRack("host3", "rack2")
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc,
+ ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+ val taskSet = FakeTask.createTaskSet(2,
+ Seq(TaskLocation("host1", "execA")),
+ Seq(TaskLocation("host1", "execA")))
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+ assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
+ // Set allowed locality to ANY
+ clock.advance(LOCALITY_WAIT * 3)
+ // Offer host3
+ // No task is scheduled if we restrict locality to RACK_LOCAL
+ assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None)
+ // Task 0 can be scheduled with ANY
+ assert(manager.resourceOffer("execC", "host3", ANY).get.index === 0)
+ // Offer host2
+ // Task 1 can be scheduled with RACK_LOCAL
+ assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1)
}
test("do not emit warning when serialized task is small") {