aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala108
1 files changed, 79 insertions, 29 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index a9d9c5e44c..cf4483f144 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -32,28 +32,28 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val TASK_REVIVAL_INTERVAL = System.getProperty("spark.tasks.revive.interval", "0").toLong
/*
- This property controls how aggressive we should be to modulate waiting for host local task scheduling.
- To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for host locality of tasks before
+ This property controls how aggressive we should be to modulate waiting for node local task scheduling.
+ To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for node locality of tasks before
scheduling on other nodes. We have modified this in yarn branch such that offers to task set happen in prioritized order :
- host-local, rack-local and then others
- But once all available host local (and no pref) tasks are scheduled, instead of waiting for 3 sec before
+ node-local, rack-local and then others
+ But once all available node local (and no pref) tasks are scheduled, instead of waiting for 3 sec before
scheduling to other nodes (which degrades performance for time sensitive tasks and on larger clusters), we can
modulate that : to also allow rack local nodes or any node. The default is still set to HOST - so that previous behavior is
maintained. This is to allow tuning the tension between pulling rdd data off node and scheduling computation asap.
TODO: rename property ? The value is one of
- - HOST_LOCAL (default, no change w.r.t current behavior),
+ - NODE_LOCAL (default, no change w.r.t current behavior),
- RACK_LOCAL and
- ANY
Note that this property makes more sense when used in conjugation with spark.tasks.revive.interval > 0 : else it is not very effective.
Additional Note: For non trivial clusters, there is a 4x - 5x reduction in running time (in some of our experiments) based on whether
- it is left at default HOST_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY.
+ it is left at default NODE_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY.
If cluster is rack aware, then setting it to RACK_LOCAL gives best tradeoff and a 3x - 4x performance improvement while minimizing IO impact.
Also, it brings down the variance in running time drastically.
*/
- val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "HOST_LOCAL"))
+ val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "NODE_LOCAL"))
val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -73,15 +73,15 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val activeExecutorIds = new HashSet[String]
// TODO: We might want to remove this and merge it with execId datastructures - but later.
- // Which hosts in the cluster are alive (contains hostPort's) - used for hyper local and local task locality.
+ // Which hosts in the cluster are alive (contains hostPort's) - used for process local and node local task locality.
private val hostPortsAlive = new HashSet[String]
private val hostToAliveHostPorts = new HashMap[String, HashSet[String]]
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
- val executorsByHostPort = new HashMap[String, HashSet[String]]
+ private val executorsByHostPort = new HashMap[String, HashSet[String]]
- val executorIdToHostPort = new HashMap[String, String]
+ private val executorIdToHostPort = new HashMap[String, String]
// JAR server, if any JARs were added by the user to the SparkContext
var jarServer: HttpServer = null
@@ -102,6 +102,14 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def initialize(context: SchedulerBackend) {
backend = context
+ // resolve executorId to hostPort mapping.
+ def executorToHostPort(executorId: String, defaultHostPort: String): String = {
+ executorIdToHostPort.getOrElse(executorId, defaultHostPort)
+ }
+
+ // Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler
+ // Will that be a design violation ?
+ SparkEnv.get.executorIdToHostPort = Some(executorToHostPort)
}
def newTaskId(): Long = nextTaskId.getAndIncrement()
@@ -209,14 +217,31 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
+ // merge availableCpus into nodeToAvailableCpus block ?
val availableCpus = offers.map(o => o.cores).toArray
+ val nodeToAvailableCpus = {
+ val map = new HashMap[String, Int]()
+ for (offer <- offers) {
+ val hostPort = offer.hostPort
+ val cores = offer.cores
+ // DEBUG code
+ Utils.checkHostPort(hostPort)
+
+ val host = Utils.parseHostPort(hostPort)._1
+
+ map.put(host, map.getOrElse(host, 0) + cores)
+ }
+
+ map
+ }
var launchedTask = false
for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
- // Split offers based on host local, rack local and off-rack tasks.
- val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
+ // Split offers based on node local, rack local and off-rack tasks.
+ val processLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
+ val nodeLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val otherOffers = new HashMap[String, ArrayBuffer[Int]]()
@@ -224,21 +249,30 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val hostPort = offers(i).hostPort
// DEBUG code
Utils.checkHostPort(hostPort)
+
+ val numProcessLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
+ if (numProcessLocalTasks > 0){
+ val list = processLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
+ for (j <- 0 until numProcessLocalTasks) list += i
+ }
+
val host = Utils.parseHostPort(hostPort)._1
- val numHostLocalTasks = math.max(0, math.min(manager.numPendingTasksForHost(hostPort), availableCpus(i)))
- if (numHostLocalTasks > 0){
- val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
- for (j <- 0 until numHostLocalTasks) list += i
+ val numNodeLocalTasks = math.max(0,
+ // Remove process local tasks (which are also host local btw !) from this
+ math.min(manager.numPendingTasksForHost(hostPort) - numProcessLocalTasks, nodeToAvailableCpus(host)))
+ if (numNodeLocalTasks > 0){
+ val list = nodeLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
+ for (j <- 0 until numNodeLocalTasks) list += i
}
val numRackLocalTasks = math.max(0,
- // Remove host local tasks (which are also rack local btw !) from this
- math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHostLocalTasks, availableCpus(i)))
+ // Remove node local tasks (which are also rack local btw !) from this
+ math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numProcessLocalTasks - numNodeLocalTasks, nodeToAvailableCpus(host)))
if (numRackLocalTasks > 0){
val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
for (j <- 0 until numRackLocalTasks) list += i
}
- if (numHostLocalTasks <= 0 && numRackLocalTasks <= 0){
+ if (numNodeLocalTasks <= 0 && numRackLocalTasks <= 0){
// add to others list - spread even this across cluster.
val list = otherOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
list += i
@@ -246,12 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
val offersPriorityList = new ArrayBuffer[Int](
- hostLocalOffers.size + rackLocalOffers.size + otherOffers.size)
- // First host local, then rack, then others
- val numHostLocalOffers = {
- val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers)
- offersPriorityList ++= hostLocalPriorityList
- hostLocalPriorityList.size
+ processLocalOffers.size + nodeLocalOffers.size + rackLocalOffers.size + otherOffers.size)
+
+ // First process local, then host local, then rack, then others
+
+ // numNodeLocalOffers contains count of both process local and host offers.
+ val numNodeLocalOffers = {
+ val processLocalPriorityList = ClusterScheduler.prioritizeContainers(processLocalOffers)
+ offersPriorityList ++= processLocalPriorityList
+
+ val nodeLocalPriorityList = ClusterScheduler.prioritizeContainers(nodeLocalOffers)
+ offersPriorityList ++= nodeLocalPriorityList
+
+ processLocalPriorityList.size + nodeLocalPriorityList.size
}
val numRackLocalOffers = {
val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers)
@@ -262,8 +303,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var lastLoop = false
val lastLoopIndex = TASK_SCHEDULING_AGGRESSION match {
- case TaskLocality.HOST_LOCAL => numHostLocalOffers
- case TaskLocality.RACK_LOCAL => numRackLocalOffers + numHostLocalOffers
+ case TaskLocality.NODE_LOCAL => numNodeLocalOffers
+ case TaskLocality.RACK_LOCAL => numRackLocalOffers + numNodeLocalOffers
case TaskLocality.ANY => offersPriorityList.size
}
@@ -302,8 +343,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// prevent more looping
launchedTask = false
} else if (!lastLoop && !launchedTask) {
- // Do this only if TASK_SCHEDULING_AGGRESSION != HOST_LOCAL
- if (TASK_SCHEDULING_AGGRESSION != TaskLocality.HOST_LOCAL) {
+ // Do this only if TASK_SCHEDULING_AGGRESSION != NODE_LOCAL
+ if (TASK_SCHEDULING_AGGRESSION != TaskLocality.NODE_LOCAL) {
// fudge launchedTask to ensure we loop once more
launchedTask = true
// dont loop anymore
@@ -477,6 +518,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
def getExecutorsAliveOnHost(host: String): Option[Set[String]] = {
+ Utils.checkHost(host)
+
val retval = hostToAliveHostPorts.get(host)
if (retval.isDefined) {
return Some(retval.get.toSet)
@@ -485,6 +528,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
None
}
+ def isExecutorAliveOnHostPort(hostPort: String): Boolean = {
+ // Even if hostPort is a host, it does not matter - it is just a specific check.
+ // But we do have to ensure that only hostPort get into hostPortsAlive !
+ // So no check against Utils.checkHostPort
+ hostPortsAlive.contains(hostPort)
+ }
+
// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None