diff options
Diffstat (limited to 'core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala')
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala | 182 |
1 files changed, 120 insertions, 62 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 27e713e2c4..c69f3bdb7f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -13,17 +13,21 @@ import spark.scheduler._ import spark.TaskState.TaskState import java.nio.ByteBuffer -private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging { +private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging { - val HOST_LOCAL, RACK_LOCAL, ANY = Value + // process local is expected to be used ONLY within tasksetmanager for now. + val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value type TaskLocality = Value def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { + // Must not be the constraint. + assert (constraint != TaskLocality.PROCESS_LOCAL) + constraint match { - case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL - case TaskLocality.RACK_LOCAL => condition == TaskLocality.HOST_LOCAL || condition == TaskLocality.RACK_LOCAL + case TaskLocality.NODE_LOCAL => condition == TaskLocality.NODE_LOCAL + case TaskLocality.RACK_LOCAL => condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL // For anything else, allow case _ => true } @@ -32,12 +36,16 @@ private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL def parse(str: String): TaskLocality = { // better way to do this ? try { - TaskLocality.withName(str) + val retval = TaskLocality.withName(str) + // Must not specify PROCESS_LOCAL ! + assert (retval != TaskLocality.PROCESS_LOCAL) + + retval } catch { case nEx: NoSuchElementException => { - logWarning("Invalid task locality specified '" + str + "', defaulting to HOST_LOCAL"); + logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL"); // default to preserve earlier behavior - HOST_LOCAL + NODE_LOCAL } } } @@ -76,7 +84,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis - // List of pending tasks for each node (hyper local to container). These collections are actually + // List of pending tasks for each node (process local to container). These collections are actually // treated as stacks, in which new tasks are added to the end of the // ArrayBuffer and removed from the end. This makes it faster to detect // tasks that repeatedly fail because whenever a task failed, it is put @@ -133,35 +141,55 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe addPendingTask(i) } - private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, rackLocal: Boolean = false): ArrayBuffer[String] = { - // DEBUG code - _taskPreferredLocations.foreach(h => Utils.checkHost(h, "taskPreferredLocation " + _taskPreferredLocations)) - - val taskPreferredLocations = if (! rackLocal) _taskPreferredLocations else { - // Expand set to include all 'seen' rack local hosts. - // This works since container allocation/management happens within master - so any rack locality information is updated in msater. - // Best case effort, and maybe sort of kludge for now ... rework it later ? - val hosts = new HashSet[String] - _taskPreferredLocations.foreach(h => { - val rackOpt = scheduler.getRackForHost(h) - if (rackOpt.isDefined) { - val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get) - if (hostsOpt.isDefined) { - hosts ++= hostsOpt.get + // Note that it follows the hierarchy. + // if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and + // if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL + private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, + taskLocality: TaskLocality.TaskLocality): HashSet[String] = { + + if (TaskLocality.PROCESS_LOCAL == taskLocality) { + // straight forward comparison ! Special case it. + val retval = new HashSet[String]() + scheduler.synchronized { + for (location <- _taskPreferredLocations) { + if (scheduler.isExecutorAliveOnHostPort(location)) { + retval += location } } + } - // Ensure that irrespective of what scheduler says, host is always added ! - hosts += h - }) - - hosts + return retval } - val retval = new ArrayBuffer[String] + val taskPreferredLocations = + if (TaskLocality.NODE_LOCAL == taskLocality) { + _taskPreferredLocations + } else { + assert (TaskLocality.RACK_LOCAL == taskLocality) + // Expand set to include all 'seen' rack local hosts. + // This works since container allocation/management happens within master - so any rack locality information is updated in msater. + // Best case effort, and maybe sort of kludge for now ... rework it later ? + val hosts = new HashSet[String] + _taskPreferredLocations.foreach(h => { + val rackOpt = scheduler.getRackForHost(h) + if (rackOpt.isDefined) { + val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get) + if (hostsOpt.isDefined) { + hosts ++= hostsOpt.get + } + } + + // Ensure that irrespective of what scheduler says, host is always added ! + hosts += h + }) + + hosts + } + + val retval = new HashSet[String] scheduler.synchronized { for (prefLocation <- taskPreferredLocations) { - val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(prefLocation) + val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(Utils.parseHostPort(prefLocation)._1) if (aliveLocationsOpt.isDefined) { retval ++= aliveLocationsOpt.get } @@ -175,29 +203,37 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe private def addPendingTask(index: Int) { // We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate // hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it. - val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched) - val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, true) + val processLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.PROCESS_LOCAL) + val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL) + val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) if (rackLocalLocations.size == 0) { // Current impl ensures this. + assert (processLocalLocations.size == 0) assert (hostLocalLocations.size == 0) pendingTasksWithNoPrefs += index } else { - // host locality - for (hostPort <- hostLocalLocations) { + // process local locality + for (hostPort <- processLocalLocations) { // DEBUG Code Utils.checkHostPort(hostPort) val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer()) hostPortList += index + } + + // host locality (includes process local) + for (hostPort <- hostLocalLocations) { + // DEBUG Code + Utils.checkHostPort(hostPort) val host = Utils.parseHostPort(hostPort)._1 val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer()) hostList += index } - // rack locality + // rack locality (includes process local and host local) for (rackLocalHostPort <- rackLocalLocations) { // DEBUG Code Utils.checkHostPort(rackLocalHostPort) @@ -211,7 +247,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe allPendingTasks += index } - // Return the pending tasks list for a given host port (hyper local), or an empty list if + // Return the pending tasks list for a given host port (process local), or an empty list if // there is no map entry for that host private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = { // DEBUG Code @@ -233,6 +269,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer()) } + // Number of pending tasks for a given host Port (which would be process local) + def numPendingTasksForHostPort(hostPort: String): Int = { + getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) ) + } + // Number of pending tasks for a given host (which would be data local) def numPendingTasksForHost(hostPort: String): Int = { getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) ) @@ -264,13 +305,13 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // task must have a preference for this host/rack/no preferred locations at all. private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = { - assert (TaskLocality.isAllowed(locality, TaskLocality.HOST_LOCAL)) + assert (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set if (speculatableTasks.size > 0) { val localTask = speculatableTasks.find { index => - val locations = findPreferredLocations(tasks(index).preferredLocations, sched) + val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL) val attemptLocs = taskAttempts(index).map(_.hostPort) (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort) } @@ -284,7 +325,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { val rackTask = speculatableTasks.find { index => - val locations = findPreferredLocations(tasks(index).preferredLocations, sched, true) + val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) val attemptLocs = taskAttempts(index).map(_.hostPort) locations.contains(hostPort) && !attemptLocs.contains(hostPort) } @@ -311,6 +352,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Dequeue a pending task for a given node and return its index. // If localOnly is set to false, allow non-local tasks as well. private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = { + val processLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort)) + if (processLocalTask != None) { + return processLocalTask + } + val localTask = findTaskFromList(getPendingTasksForHost(hostPort)) if (localTask != None) { return localTask @@ -341,30 +387,31 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return findSpeculativeTask(hostPort, locality) } - // Does a host count as a preferred location for a task? This is true if - // either the task has preferred locations and this host is one, or it has - // no preferred locations (in which we still count the launch as preferred). - private def isPreferredLocation(task: Task[_], hostPort: String): Boolean = { + private def isProcessLocalLocation(task: Task[_], hostPort: String): Boolean = { + Utils.checkHostPort(hostPort) + val locs = task.preferredLocations - // DEBUG code - locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs)) - if (locs.contains(hostPort) || locs.isEmpty) return true + locs.contains(hostPort) + } + + private def isHostLocalLocation(task: Task[_], hostPort: String): Boolean = { + val locs = task.preferredLocations + + // If no preference, consider it as host local + if (locs.isEmpty) return true val host = Utils.parseHostPort(hostPort)._1 - locs.contains(host) + locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined } // Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location). // This is true if either the task has preferred locations and this host is one, or it has // no preferred locations (in which we still count the launch as preferred). - def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = { + private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = { val locs = task.preferredLocations - // DEBUG code - locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs)) - val preferredRacks = new HashSet[String]() for (preferredHost <- locs) { val rack = sched.getRackForHost(preferredHost) @@ -386,7 +433,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val locality = if (overrideLocality != null) overrideLocality else { // expand only if we have waited for more than LOCALITY_WAIT for a host local task ... val time = System.currentTimeMillis - if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.HOST_LOCAL else TaskLocality.ANY + if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.NODE_LOCAL else TaskLocality.ANY } findTask(hostPort, locality) match { @@ -395,8 +442,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val task = tasks(index) val taskId = sched.newTaskId() // Figure out whether this should count as a preferred launch - val taskLocality = if (isPreferredLocation(task, hostPort)) TaskLocality.HOST_LOCAL else - if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else TaskLocality.ANY + val taskLocality = + if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL else + if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL else + if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else + TaskLocality.ANY val prefStr = taskLocality.toString logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format( taskSet.id, index, taskId, execId, hostPort, prefStr)) @@ -406,7 +456,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) - if (TaskLocality.HOST_LOCAL == taskLocality) { + if (TaskLocality.NODE_LOCAL == taskLocality) { lastPreferredLaunchTime = time } // Serialize and return the task @@ -493,7 +543,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return case ef: ExceptionFailure => - val key = ef.exception.toString + val key = ef.description val now = System.currentTimeMillis val (printFull, dupCount) = { if (recentExceptions.contains(key)) { @@ -511,10 +561,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe } } if (printFull) { - val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString)) - logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n"))) + val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) + logInfo("Loss was due to %s\n%s\n%s".format( + ef.className, ef.description, locs.mkString("\n"))) } else { - logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount)) + logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) } case _ => {} @@ -552,15 +603,22 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe def executorLost(execId: String, hostPort: String) { logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id) + // If some task has preferred locations only on hostname, and there are no more executors there, // put it in the no-prefs list to avoid the wait from delay scheduling - for (index <- getPendingTasksForHostPort(hostPort)) { - val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, true) + + // host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to + // no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc. + // Note: NOT checking process local list - since host local list is super set of that. We need to ad to no prefs only if + // there is no host local node for the task (not if there is no process local node for the task) + for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) { + // val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) + val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL) if (newLocs.isEmpty) { - assert (findPreferredLocations(tasks(index).preferredLocations, sched).isEmpty) pendingTasksWithNoPrefs += index } } + // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage if (tasks(0).isInstanceOf[ShuffleMapTask]) { for ((tid, info) <- taskInfos if info.executorId == execId) { |