diff options
Diffstat (limited to 'core')
6 files changed, 47 insertions, 50 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 5b4a464010..2ee25e547d 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -33,8 +33,7 @@ class SparkEnv ( // To be set only as part of initialization of SparkContext. // (executorId, defaultHostPort) => executorHostPort // If executorId is NOT found, return defaultHostPort - var executorIdToHostPort: (String, String) => String - ) { + var executorIdToHostPort: Option[(String, String) => String]) { def stop() { httpFileServer.stop() @@ -52,12 +51,12 @@ class SparkEnv ( def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = { val env = SparkEnv.get - if (env.executorIdToHostPort == null) { + if (env.executorIdToHostPort.isEmpty) { // default to using host, not host port. Relevant to non cluster modes. return defaultHostPort } - env.executorIdToHostPort(executorId, defaultHostPort) + env.executorIdToHostPort.get(executorId, defaultHostPort) } } @@ -178,6 +177,6 @@ object SparkEnv extends Logging { connectionManager, httpFileServer, sparkFilesDir, - null) + None) } } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 279daf04ed..0e348f8189 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -335,7 +335,6 @@ private object Utils extends Logging { retval } - /* // Used by DEBUG code : remove when all testing done private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$") def checkHost(host: String, message: String = "") { @@ -364,8 +363,8 @@ private object Utils extends Logging { // temp code for debug System.exit(-1) } - */ +/* // Once testing is complete in various modes, replace with this ? def checkHost(host: String, message: String = "") {} def checkHostPort(hostPort: String, message: String = "") {} @@ -374,6 +373,7 @@ private object Utils extends Logging { def logErrorWithStack(msg: String) { try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } } } +*/ def getUserNameFromEnvironment(): String = { SparkHadoopUtil.getUserNameFromEnvironment diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala index c43cbe5ed4..83166bce22 100644 --- a/core/src/main/scala/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/spark/scheduler/ResultTask.scala @@ -70,7 +70,6 @@ private[spark] class ResultTask[T, U]( rdd.partitions(partition) } - // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts. private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq { diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 0b848af2f3..4b36e71c32 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -84,7 +84,6 @@ private[spark] class ShuffleMapTask( protected def this() = this(0, null, null, 0, null) - // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts. private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq { diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 3c72ce4206..49fc449e86 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -73,7 +73,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val activeExecutorIds = new HashSet[String] // TODO: We might want to remove this and merge it with execId datastructures - but later. - // Which hosts in the cluster are alive (contains hostPort's) - used for hyper local and local task locality. + // Which hosts in the cluster are alive (contains hostPort's) - used for instance local and node local task locality. private val hostPortsAlive = new HashSet[String] private val hostToAliveHostPorts = new HashMap[String, HashSet[String]] @@ -109,7 +109,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler // Will that be a design violation ? - SparkEnv.get.executorIdToHostPort = executorToHostPort + SparkEnv.get.executorIdToHostPort = Some(executorToHostPort) } def newTaskId(): Long = nextTaskId.getAndIncrement() @@ -240,7 +240,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) { // Split offers based on host local, rack local and off-rack tasks. - val hyperLocalOffers = new HashMap[String, ArrayBuffer[Int]]() + val instanceLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val otherOffers = new HashMap[String, ArrayBuffer[Int]]() @@ -250,16 +250,16 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // DEBUG code Utils.checkHostPort(hostPort) - val numHyperLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i))) - if (numHyperLocalTasks > 0){ - val list = hyperLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int]) - for (j <- 0 until numHyperLocalTasks) list += i + val numInstanceLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i))) + if (numInstanceLocalTasks > 0){ + val list = instanceLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int]) + for (j <- 0 until numInstanceLocalTasks) list += i } val host = Utils.parseHostPort(hostPort)._1 val numHostLocalTasks = math.max(0, - // Remove hyper local tasks (which are also host local btw !) from this - math.min(manager.numPendingTasksForHost(hostPort) - numHyperLocalTasks, hostToAvailableCpus(host))) + // Remove instance local tasks (which are also host local btw !) from this + math.min(manager.numPendingTasksForHost(hostPort) - numInstanceLocalTasks, hostToAvailableCpus(host))) if (numHostLocalTasks > 0){ val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) for (j <- 0 until numHostLocalTasks) list += i @@ -267,7 +267,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val numRackLocalTasks = math.max(0, // Remove host local tasks (which are also rack local btw !) from this - math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHyperLocalTasks - numHostLocalTasks, hostToAvailableCpus(host))) + math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numInstanceLocalTasks - numHostLocalTasks, hostToAvailableCpus(host))) if (numRackLocalTasks > 0){ val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) for (j <- 0 until numRackLocalTasks) list += i @@ -280,19 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } val offersPriorityList = new ArrayBuffer[Int]( - hyperLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size) + instanceLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size) - // First hyper local, then host local, then rack, then others + // First instance local, then host local, then rack, then others - // numHostLocalOffers contains count of both hyper local and host offers. + // numHostLocalOffers contains count of both instance local and host offers. val numHostLocalOffers = { - val hyperLocalPriorityList = ClusterScheduler.prioritizeContainers(hyperLocalOffers) - offersPriorityList ++= hyperLocalPriorityList + val instanceLocalPriorityList = ClusterScheduler.prioritizeContainers(instanceLocalOffers) + offersPriorityList ++= instanceLocalPriorityList val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers) offersPriorityList ++= hostLocalPriorityList - hyperLocalPriorityList.size + hostLocalPriorityList.size + instanceLocalPriorityList.size + hostLocalPriorityList.size } val numRackLocalOffers = { val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers) diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index f5c0058554..5f3faaa5c3 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -13,17 +13,17 @@ import spark.scheduler._ import spark.TaskState.TaskState import java.nio.ByteBuffer -private[spark] object TaskLocality extends Enumeration("HYPER_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging { +private[spark] object TaskLocality extends Enumeration("INSTANCE_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging { - // hyper local is expected to be used ONLY within tasksetmanager for now. - val HYPER_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value + // instance local is expected to be used ONLY within tasksetmanager for now. + val INSTANCE_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value type TaskLocality = Value def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { // Must not be the constraint. - assert (constraint != TaskLocality.HYPER_LOCAL) + assert (constraint != TaskLocality.INSTANCE_LOCAL) constraint match { case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL @@ -37,8 +37,8 @@ private[spark] object TaskLocality extends Enumeration("HYPER_LOCAL", "HOST_LOCA // better way to do this ? try { val retval = TaskLocality.withName(str) - // Must not specify HYPER_LOCAL ! - assert (retval != TaskLocality.HYPER_LOCAL) + // Must not specify INSTANCE_LOCAL ! + assert (retval != TaskLocality.INSTANCE_LOCAL) retval } catch { @@ -84,7 +84,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis - // List of pending tasks for each node (hyper local to container). These collections are actually + // List of pending tasks for each node (instance local to container). These collections are actually // treated as stacks, in which new tasks are added to the end of the // ArrayBuffer and removed from the end. This makes it faster to detect // tasks that repeatedly fail because whenever a task failed, it is put @@ -142,12 +142,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe } // Note that it follows the hierarchy. - // if we search for HOST_LOCAL, the output will include HYPER_LOCAL and - // if we search for RACK_LOCAL, it will include HYPER_LOCAL & HOST_LOCAL + // if we search for HOST_LOCAL, the output will include INSTANCE_LOCAL and + // if we search for RACK_LOCAL, it will include INSTANCE_LOCAL & HOST_LOCAL private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, taskLocality: TaskLocality.TaskLocality): HashSet[String] = { - if (TaskLocality.HYPER_LOCAL == taskLocality) { + if (TaskLocality.INSTANCE_LOCAL == taskLocality) { // straight forward comparison ! Special case it. val retval = new HashSet[String]() scheduler.synchronized { @@ -203,19 +203,19 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe private def addPendingTask(index: Int) { // We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate // hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it. - val hyperLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HYPER_LOCAL) + val instanceLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.INSTANCE_LOCAL) val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) if (rackLocalLocations.size == 0) { // Current impl ensures this. - assert (hyperLocalLocations.size == 0) + assert (instanceLocalLocations.size == 0) assert (hostLocalLocations.size == 0) pendingTasksWithNoPrefs += index } else { - // hyper local locality - for (hostPort <- hyperLocalLocations) { + // instance local locality + for (hostPort <- instanceLocalLocations) { // DEBUG Code Utils.checkHostPort(hostPort) @@ -223,7 +223,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe hostPortList += index } - // host locality (includes hyper local) + // host locality (includes instance local) for (hostPort <- hostLocalLocations) { // DEBUG Code Utils.checkHostPort(hostPort) @@ -233,7 +233,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe hostList += index } - // rack locality (includes hyper local and host local) + // rack locality (includes instance local and host local) for (rackLocalHostPort <- rackLocalLocations) { // DEBUG Code Utils.checkHostPort(rackLocalHostPort) @@ -247,7 +247,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe allPendingTasks += index } - // Return the pending tasks list for a given host port (hyper local), or an empty list if + // Return the pending tasks list for a given host port (instance local), or an empty list if // there is no map entry for that host private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = { // DEBUG Code @@ -269,7 +269,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer()) } - // Number of pending tasks for a given host Port (which would be hyper local) + // Number of pending tasks for a given host Port (which would be instance local) def numPendingTasksForHostPort(hostPort: String): Int = { getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) ) } @@ -352,9 +352,9 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Dequeue a pending task for a given node and return its index. // If localOnly is set to false, allow non-local tasks as well. private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = { - val hyperLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort)) - if (hyperLocalTask != None) { - return hyperLocalTask + val instanceLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort)) + if (instanceLocalTask != None) { + return instanceLocalTask } val localTask = findTaskFromList(getPendingTasksForHost(hostPort)) @@ -387,7 +387,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return findSpeculativeTask(hostPort, locality) } - private def isHyperLocalLocation(task: Task[_], hostPort: String): Boolean = { + private def isInstanceLocalLocation(task: Task[_], hostPort: String): Boolean = { Utils.checkHostPort(hostPort) val locs = task.preferredLocations @@ -443,7 +443,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val taskId = sched.newTaskId() // Figure out whether this should count as a preferred launch val taskLocality = - if (isHyperLocalLocation(task, hostPort)) TaskLocality.HYPER_LOCAL else + if (isInstanceLocalLocation(task, hostPort)) TaskLocality.INSTANCE_LOCAL else if (isHostLocalLocation(task, hostPort)) TaskLocality.HOST_LOCAL else if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else TaskLocality.ANY @@ -608,8 +608,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to // no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc. - // Note: NOT checking hyper local list - since host local list is super set of that. We need to ad to no prefs only if - // there is no host local node for the task (not if there is no hyper local node for the task) + // Note: NOT checking instance local list - since host local list is super set of that. We need to ad to no prefs only if + // there is no host local node for the task (not if there is no instance local node for the task) for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) { // val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) |