aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-05-02 06:44:33 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-05-02 06:44:33 +0530
commit609a817f52d8db05711c0d4529dd1448ed8c4fe0 (patch)
treec6426a34d277df6262cca61dc03a6ec49cadbfbc /core
parent27764a00f40391b94fa05abb11484c442607f6f7 (diff)
downloadspark-609a817f52d8db05711c0d4529dd1448ed8c4fe0.tar.gz
spark-609a817f52d8db05711c0d4529dd1448ed8c4fe0.tar.bz2
spark-609a817f52d8db05711c0d4529dd1448ed8c4fe0.zip
Integrate review comments on pull request
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala9
-rw-r--r--core/src/main/scala/spark/Utils.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala32
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala50
6 files changed, 47 insertions, 50 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 5b4a464010..2ee25e547d 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -33,8 +33,7 @@ class SparkEnv (
// To be set only as part of initialization of SparkContext.
// (executorId, defaultHostPort) => executorHostPort
// If executorId is NOT found, return defaultHostPort
- var executorIdToHostPort: (String, String) => String
- ) {
+ var executorIdToHostPort: Option[(String, String) => String]) {
def stop() {
httpFileServer.stop()
@@ -52,12 +51,12 @@ class SparkEnv (
def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = {
val env = SparkEnv.get
- if (env.executorIdToHostPort == null) {
+ if (env.executorIdToHostPort.isEmpty) {
// default to using host, not host port. Relevant to non cluster modes.
return defaultHostPort
}
- env.executorIdToHostPort(executorId, defaultHostPort)
+ env.executorIdToHostPort.get(executorId, defaultHostPort)
}
}
@@ -178,6 +177,6 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
- null)
+ None)
}
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 279daf04ed..0e348f8189 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -335,7 +335,6 @@ private object Utils extends Logging {
retval
}
- /*
// Used by DEBUG code : remove when all testing done
private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$")
def checkHost(host: String, message: String = "") {
@@ -364,8 +363,8 @@ private object Utils extends Logging {
// temp code for debug
System.exit(-1)
}
- */
+/*
// Once testing is complete in various modes, replace with this ?
def checkHost(host: String, message: String = "") {}
def checkHostPort(hostPort: String, message: String = "") {}
@@ -374,6 +373,7 @@ private object Utils extends Logging {
def logErrorWithStack(msg: String) {
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
}
+*/
def getUserNameFromEnvironment(): String = {
SparkHadoopUtil.getUserNameFromEnvironment
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index c43cbe5ed4..83166bce22 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -70,7 +70,6 @@ private[spark] class ResultTask[T, U](
rdd.partitions(partition)
}
- // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts.
private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
{
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 0b848af2f3..4b36e71c32 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -84,7 +84,6 @@ private[spark] class ShuffleMapTask(
protected def this() = this(0, null, null, 0, null)
- // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts.
private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
{
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 3c72ce4206..49fc449e86 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -73,7 +73,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val activeExecutorIds = new HashSet[String]
// TODO: We might want to remove this and merge it with execId datastructures - but later.
- // Which hosts in the cluster are alive (contains hostPort's) - used for hyper local and local task locality.
+ // Which hosts in the cluster are alive (contains hostPort's) - used for instance local and node local task locality.
private val hostPortsAlive = new HashSet[String]
private val hostToAliveHostPorts = new HashMap[String, HashSet[String]]
@@ -109,7 +109,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler
// Will that be a design violation ?
- SparkEnv.get.executorIdToHostPort = executorToHostPort
+ SparkEnv.get.executorIdToHostPort = Some(executorToHostPort)
}
def newTaskId(): Long = nextTaskId.getAndIncrement()
@@ -240,7 +240,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
// Split offers based on host local, rack local and off-rack tasks.
- val hyperLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
+ val instanceLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val otherOffers = new HashMap[String, ArrayBuffer[Int]]()
@@ -250,16 +250,16 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// DEBUG code
Utils.checkHostPort(hostPort)
- val numHyperLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
- if (numHyperLocalTasks > 0){
- val list = hyperLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
- for (j <- 0 until numHyperLocalTasks) list += i
+ val numInstanceLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
+ if (numInstanceLocalTasks > 0){
+ val list = instanceLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
+ for (j <- 0 until numInstanceLocalTasks) list += i
}
val host = Utils.parseHostPort(hostPort)._1
val numHostLocalTasks = math.max(0,
- // Remove hyper local tasks (which are also host local btw !) from this
- math.min(manager.numPendingTasksForHost(hostPort) - numHyperLocalTasks, hostToAvailableCpus(host)))
+ // Remove instance local tasks (which are also host local btw !) from this
+ math.min(manager.numPendingTasksForHost(hostPort) - numInstanceLocalTasks, hostToAvailableCpus(host)))
if (numHostLocalTasks > 0){
val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
for (j <- 0 until numHostLocalTasks) list += i
@@ -267,7 +267,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val numRackLocalTasks = math.max(0,
// Remove host local tasks (which are also rack local btw !) from this
- math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHyperLocalTasks - numHostLocalTasks, hostToAvailableCpus(host)))
+ math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numInstanceLocalTasks - numHostLocalTasks, hostToAvailableCpus(host)))
if (numRackLocalTasks > 0){
val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
for (j <- 0 until numRackLocalTasks) list += i
@@ -280,19 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
val offersPriorityList = new ArrayBuffer[Int](
- hyperLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size)
+ instanceLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size)
- // First hyper local, then host local, then rack, then others
+ // First instance local, then host local, then rack, then others
- // numHostLocalOffers contains count of both hyper local and host offers.
+ // numHostLocalOffers contains count of both instance local and host offers.
val numHostLocalOffers = {
- val hyperLocalPriorityList = ClusterScheduler.prioritizeContainers(hyperLocalOffers)
- offersPriorityList ++= hyperLocalPriorityList
+ val instanceLocalPriorityList = ClusterScheduler.prioritizeContainers(instanceLocalOffers)
+ offersPriorityList ++= instanceLocalPriorityList
val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers)
offersPriorityList ++= hostLocalPriorityList
- hyperLocalPriorityList.size + hostLocalPriorityList.size
+ instanceLocalPriorityList.size + hostLocalPriorityList.size
}
val numRackLocalOffers = {
val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers)
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index f5c0058554..5f3faaa5c3 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -13,17 +13,17 @@ import spark.scheduler._
import spark.TaskState.TaskState
import java.nio.ByteBuffer
-private[spark] object TaskLocality extends Enumeration("HYPER_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging {
+private[spark] object TaskLocality extends Enumeration("INSTANCE_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging {
- // hyper local is expected to be used ONLY within tasksetmanager for now.
- val HYPER_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value
+ // instance local is expected to be used ONLY within tasksetmanager for now.
+ val INSTANCE_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value
type TaskLocality = Value
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
// Must not be the constraint.
- assert (constraint != TaskLocality.HYPER_LOCAL)
+ assert (constraint != TaskLocality.INSTANCE_LOCAL)
constraint match {
case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL
@@ -37,8 +37,8 @@ private[spark] object TaskLocality extends Enumeration("HYPER_LOCAL", "HOST_LOCA
// better way to do this ?
try {
val retval = TaskLocality.withName(str)
- // Must not specify HYPER_LOCAL !
- assert (retval != TaskLocality.HYPER_LOCAL)
+ // Must not specify INSTANCE_LOCAL !
+ assert (retval != TaskLocality.INSTANCE_LOCAL)
retval
} catch {
@@ -84,7 +84,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
- // List of pending tasks for each node (hyper local to container). These collections are actually
+ // List of pending tasks for each node (instance local to container). These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
@@ -142,12 +142,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
// Note that it follows the hierarchy.
- // if we search for HOST_LOCAL, the output will include HYPER_LOCAL and
- // if we search for RACK_LOCAL, it will include HYPER_LOCAL & HOST_LOCAL
+ // if we search for HOST_LOCAL, the output will include INSTANCE_LOCAL and
+ // if we search for RACK_LOCAL, it will include INSTANCE_LOCAL & HOST_LOCAL
private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler,
taskLocality: TaskLocality.TaskLocality): HashSet[String] = {
- if (TaskLocality.HYPER_LOCAL == taskLocality) {
+ if (TaskLocality.INSTANCE_LOCAL == taskLocality) {
// straight forward comparison ! Special case it.
val retval = new HashSet[String]()
scheduler.synchronized {
@@ -203,19 +203,19 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
private def addPendingTask(index: Int) {
// We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate
// hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it.
- val hyperLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HYPER_LOCAL)
+ val instanceLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.INSTANCE_LOCAL)
val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL)
val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
if (rackLocalLocations.size == 0) {
// Current impl ensures this.
- assert (hyperLocalLocations.size == 0)
+ assert (instanceLocalLocations.size == 0)
assert (hostLocalLocations.size == 0)
pendingTasksWithNoPrefs += index
} else {
- // hyper local locality
- for (hostPort <- hyperLocalLocations) {
+ // instance local locality
+ for (hostPort <- instanceLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
@@ -223,7 +223,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
hostPortList += index
}
- // host locality (includes hyper local)
+ // host locality (includes instance local)
for (hostPort <- hostLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
@@ -233,7 +233,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
hostList += index
}
- // rack locality (includes hyper local and host local)
+ // rack locality (includes instance local and host local)
for (rackLocalHostPort <- rackLocalLocations) {
// DEBUG Code
Utils.checkHostPort(rackLocalHostPort)
@@ -247,7 +247,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
allPendingTasks += index
}
- // Return the pending tasks list for a given host port (hyper local), or an empty list if
+ // Return the pending tasks list for a given host port (instance local), or an empty list if
// there is no map entry for that host
private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = {
// DEBUG Code
@@ -269,7 +269,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer())
}
- // Number of pending tasks for a given host Port (which would be hyper local)
+ // Number of pending tasks for a given host Port (which would be instance local)
def numPendingTasksForHostPort(hostPort: String): Int = {
getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
}
@@ -352,9 +352,9 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Dequeue a pending task for a given node and return its index.
// If localOnly is set to false, allow non-local tasks as well.
private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
- val hyperLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
- if (hyperLocalTask != None) {
- return hyperLocalTask
+ val instanceLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
+ if (instanceLocalTask != None) {
+ return instanceLocalTask
}
val localTask = findTaskFromList(getPendingTasksForHost(hostPort))
@@ -387,7 +387,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return findSpeculativeTask(hostPort, locality)
}
- private def isHyperLocalLocation(task: Task[_], hostPort: String): Boolean = {
+ private def isInstanceLocalLocation(task: Task[_], hostPort: String): Boolean = {
Utils.checkHostPort(hostPort)
val locs = task.preferredLocations
@@ -443,7 +443,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
val taskLocality =
- if (isHyperLocalLocation(task, hostPort)) TaskLocality.HYPER_LOCAL else
+ if (isInstanceLocalLocation(task, hostPort)) TaskLocality.INSTANCE_LOCAL else
if (isHostLocalLocation(task, hostPort)) TaskLocality.HOST_LOCAL else
if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else
TaskLocality.ANY
@@ -608,8 +608,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to
// no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc.
- // Note: NOT checking hyper local list - since host local list is super set of that. We need to ad to no prefs only if
- // there is no host local node for the task (not if there is no hyper local node for the task)
+ // Note: NOT checking instance local list - since host local list is super set of that. We need to ad to no prefs only if
+ // there is no host local node for the task (not if there is no instance local node for the task)
for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
// val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL)