aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-05-02 07:30:06 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-05-02 07:30:06 +0530
commit1b5aaeadc72ad5197c00897c41f670ea241d0235 (patch)
tree9da806056aa15368e02a93b746c0493d681ebcd2
parent609a817f52d8db05711c0d4529dd1448ed8c4fe0 (diff)
downloadspark-1b5aaeadc72ad5197c00897c41f670ea241d0235.tar.gz
spark-1b5aaeadc72ad5197c00897c41f670ea241d0235.tar.bz2
spark-1b5aaeadc72ad5197c00897c41f670ea241d0235.zip
Integrate review comments 2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala78
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala74
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala2
3 files changed, 77 insertions, 77 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 49fc449e86..cf4483f144 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -32,28 +32,28 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val TASK_REVIVAL_INTERVAL = System.getProperty("spark.tasks.revive.interval", "0").toLong
/*
- This property controls how aggressive we should be to modulate waiting for host local task scheduling.
- To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for host locality of tasks before
+ This property controls how aggressive we should be to modulate waiting for node local task scheduling.
+ To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for node locality of tasks before
scheduling on other nodes. We have modified this in yarn branch such that offers to task set happen in prioritized order :
- host-local, rack-local and then others
- But once all available host local (and no pref) tasks are scheduled, instead of waiting for 3 sec before
+ node-local, rack-local and then others
+ But once all available node local (and no pref) tasks are scheduled, instead of waiting for 3 sec before
scheduling to other nodes (which degrades performance for time sensitive tasks and on larger clusters), we can
modulate that : to also allow rack local nodes or any node. The default is still set to HOST - so that previous behavior is
maintained. This is to allow tuning the tension between pulling rdd data off node and scheduling computation asap.
TODO: rename property ? The value is one of
- - HOST_LOCAL (default, no change w.r.t current behavior),
+ - NODE_LOCAL (default, no change w.r.t current behavior),
- RACK_LOCAL and
- ANY
Note that this property makes more sense when used in conjugation with spark.tasks.revive.interval > 0 : else it is not very effective.
Additional Note: For non trivial clusters, there is a 4x - 5x reduction in running time (in some of our experiments) based on whether
- it is left at default HOST_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY.
+ it is left at default NODE_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY.
If cluster is rack aware, then setting it to RACK_LOCAL gives best tradeoff and a 3x - 4x performance improvement while minimizing IO impact.
Also, it brings down the variance in running time drastically.
*/
- val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "HOST_LOCAL"))
+ val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "NODE_LOCAL"))
val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -73,7 +73,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val activeExecutorIds = new HashSet[String]
// TODO: We might want to remove this and merge it with execId datastructures - but later.
- // Which hosts in the cluster are alive (contains hostPort's) - used for instance local and node local task locality.
+ // Which hosts in the cluster are alive (contains hostPort's) - used for process local and node local task locality.
private val hostPortsAlive = new HashSet[String]
private val hostToAliveHostPorts = new HashMap[String, HashSet[String]]
@@ -217,9 +217,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
- // merge availableCpus into hostToAvailableCpus block ?
+ // merge availableCpus into nodeToAvailableCpus block ?
val availableCpus = offers.map(o => o.cores).toArray
- val hostToAvailableCpus = {
+ val nodeToAvailableCpus = {
val map = new HashMap[String, Int]()
for (offer <- offers) {
val hostPort = offer.hostPort
@@ -239,9 +239,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
- // Split offers based on host local, rack local and off-rack tasks.
- val instanceLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
- val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
+ // Split offers based on node local, rack local and off-rack tasks.
+ val processLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
+ val nodeLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val otherOffers = new HashMap[String, ArrayBuffer[Int]]()
@@ -250,29 +250,29 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// DEBUG code
Utils.checkHostPort(hostPort)
- val numInstanceLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
- if (numInstanceLocalTasks > 0){
- val list = instanceLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
- for (j <- 0 until numInstanceLocalTasks) list += i
+ val numProcessLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
+ if (numProcessLocalTasks > 0){
+ val list = processLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
+ for (j <- 0 until numProcessLocalTasks) list += i
}
val host = Utils.parseHostPort(hostPort)._1
- val numHostLocalTasks = math.max(0,
- // Remove instance local tasks (which are also host local btw !) from this
- math.min(manager.numPendingTasksForHost(hostPort) - numInstanceLocalTasks, hostToAvailableCpus(host)))
- if (numHostLocalTasks > 0){
- val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
- for (j <- 0 until numHostLocalTasks) list += i
+ val numNodeLocalTasks = math.max(0,
+ // Remove process local tasks (which are also host local btw !) from this
+ math.min(manager.numPendingTasksForHost(hostPort) - numProcessLocalTasks, nodeToAvailableCpus(host)))
+ if (numNodeLocalTasks > 0){
+ val list = nodeLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
+ for (j <- 0 until numNodeLocalTasks) list += i
}
val numRackLocalTasks = math.max(0,
- // Remove host local tasks (which are also rack local btw !) from this
- math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numInstanceLocalTasks - numHostLocalTasks, hostToAvailableCpus(host)))
+ // Remove node local tasks (which are also rack local btw !) from this
+ math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numProcessLocalTasks - numNodeLocalTasks, nodeToAvailableCpus(host)))
if (numRackLocalTasks > 0){
val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
for (j <- 0 until numRackLocalTasks) list += i
}
- if (numHostLocalTasks <= 0 && numRackLocalTasks <= 0){
+ if (numNodeLocalTasks <= 0 && numRackLocalTasks <= 0){
// add to others list - spread even this across cluster.
val list = otherOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
list += i
@@ -280,19 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
val offersPriorityList = new ArrayBuffer[Int](
- instanceLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size)
+ processLocalOffers.size + nodeLocalOffers.size + rackLocalOffers.size + otherOffers.size)
- // First instance local, then host local, then rack, then others
+ // First process local, then host local, then rack, then others
- // numHostLocalOffers contains count of both instance local and host offers.
- val numHostLocalOffers = {
- val instanceLocalPriorityList = ClusterScheduler.prioritizeContainers(instanceLocalOffers)
- offersPriorityList ++= instanceLocalPriorityList
+ // numNodeLocalOffers contains count of both process local and host offers.
+ val numNodeLocalOffers = {
+ val processLocalPriorityList = ClusterScheduler.prioritizeContainers(processLocalOffers)
+ offersPriorityList ++= processLocalPriorityList
- val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers)
- offersPriorityList ++= hostLocalPriorityList
+ val nodeLocalPriorityList = ClusterScheduler.prioritizeContainers(nodeLocalOffers)
+ offersPriorityList ++= nodeLocalPriorityList
- instanceLocalPriorityList.size + hostLocalPriorityList.size
+ processLocalPriorityList.size + nodeLocalPriorityList.size
}
val numRackLocalOffers = {
val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers)
@@ -303,8 +303,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var lastLoop = false
val lastLoopIndex = TASK_SCHEDULING_AGGRESSION match {
- case TaskLocality.HOST_LOCAL => numHostLocalOffers
- case TaskLocality.RACK_LOCAL => numRackLocalOffers + numHostLocalOffers
+ case TaskLocality.NODE_LOCAL => numNodeLocalOffers
+ case TaskLocality.RACK_LOCAL => numRackLocalOffers + numNodeLocalOffers
case TaskLocality.ANY => offersPriorityList.size
}
@@ -343,8 +343,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// prevent more looping
launchedTask = false
} else if (!lastLoop && !launchedTask) {
- // Do this only if TASK_SCHEDULING_AGGRESSION != HOST_LOCAL
- if (TASK_SCHEDULING_AGGRESSION != TaskLocality.HOST_LOCAL) {
+ // Do this only if TASK_SCHEDULING_AGGRESSION != NODE_LOCAL
+ if (TASK_SCHEDULING_AGGRESSION != TaskLocality.NODE_LOCAL) {
// fudge launchedTask to ensure we loop once more
launchedTask = true
// dont loop anymore
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 5f3faaa5c3..ff4790e4cb 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -13,21 +13,21 @@ import spark.scheduler._
import spark.TaskState.TaskState
import java.nio.ByteBuffer
-private[spark] object TaskLocality extends Enumeration("INSTANCE_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging {
+private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
- // instance local is expected to be used ONLY within tasksetmanager for now.
- val INSTANCE_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value
+ // process local is expected to be used ONLY within tasksetmanager for now.
+ val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
type TaskLocality = Value
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
// Must not be the constraint.
- assert (constraint != TaskLocality.INSTANCE_LOCAL)
+ assert (constraint != TaskLocality.PROCESS_LOCAL)
constraint match {
- case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL
- case TaskLocality.RACK_LOCAL => condition == TaskLocality.HOST_LOCAL || condition == TaskLocality.RACK_LOCAL
+ case TaskLocality.NODE_LOCAL => condition == TaskLocality.NODE_LOCAL
+ case TaskLocality.RACK_LOCAL => condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
// For anything else, allow
case _ => true
}
@@ -37,15 +37,15 @@ private[spark] object TaskLocality extends Enumeration("INSTANCE_LOCAL", "HOST_L
// better way to do this ?
try {
val retval = TaskLocality.withName(str)
- // Must not specify INSTANCE_LOCAL !
- assert (retval != TaskLocality.INSTANCE_LOCAL)
+ // Must not specify PROCESS_LOCAL !
+ assert (retval != TaskLocality.PROCESS_LOCAL)
retval
} catch {
case nEx: NoSuchElementException => {
- logWarning("Invalid task locality specified '" + str + "', defaulting to HOST_LOCAL");
+ logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL");
// default to preserve earlier behavior
- HOST_LOCAL
+ NODE_LOCAL
}
}
}
@@ -84,7 +84,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
- // List of pending tasks for each node (instance local to container). These collections are actually
+ // List of pending tasks for each node (process local to container). These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
@@ -142,12 +142,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
// Note that it follows the hierarchy.
- // if we search for HOST_LOCAL, the output will include INSTANCE_LOCAL and
- // if we search for RACK_LOCAL, it will include INSTANCE_LOCAL & HOST_LOCAL
+ // if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and
+ // if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL
private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler,
taskLocality: TaskLocality.TaskLocality): HashSet[String] = {
- if (TaskLocality.INSTANCE_LOCAL == taskLocality) {
+ if (TaskLocality.PROCESS_LOCAL == taskLocality) {
// straight forward comparison ! Special case it.
val retval = new HashSet[String]()
scheduler.synchronized {
@@ -162,7 +162,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
val taskPreferredLocations =
- if (TaskLocality.HOST_LOCAL == taskLocality) {
+ if (TaskLocality.NODE_LOCAL == taskLocality) {
_taskPreferredLocations
} else {
assert (TaskLocality.RACK_LOCAL == taskLocality)
@@ -203,19 +203,19 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
private def addPendingTask(index: Int) {
// We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate
// hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it.
- val instanceLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.INSTANCE_LOCAL)
- val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL)
+ val processLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.PROCESS_LOCAL)
+ val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
if (rackLocalLocations.size == 0) {
// Current impl ensures this.
- assert (instanceLocalLocations.size == 0)
+ assert (processLocalLocations.size == 0)
assert (hostLocalLocations.size == 0)
pendingTasksWithNoPrefs += index
} else {
- // instance local locality
- for (hostPort <- instanceLocalLocations) {
+ // process local locality
+ for (hostPort <- processLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
@@ -223,7 +223,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
hostPortList += index
}
- // host locality (includes instance local)
+ // host locality (includes process local)
for (hostPort <- hostLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
@@ -233,7 +233,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
hostList += index
}
- // rack locality (includes instance local and host local)
+ // rack locality (includes process local and host local)
for (rackLocalHostPort <- rackLocalLocations) {
// DEBUG Code
Utils.checkHostPort(rackLocalHostPort)
@@ -247,7 +247,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
allPendingTasks += index
}
- // Return the pending tasks list for a given host port (instance local), or an empty list if
+ // Return the pending tasks list for a given host port (process local), or an empty list if
// there is no map entry for that host
private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = {
// DEBUG Code
@@ -269,7 +269,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer())
}
- // Number of pending tasks for a given host Port (which would be instance local)
+ // Number of pending tasks for a given host Port (which would be process local)
def numPendingTasksForHostPort(hostPort: String): Int = {
getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
}
@@ -305,13 +305,13 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// task must have a preference for this host/rack/no preferred locations at all.
private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
- assert (TaskLocality.isAllowed(locality, TaskLocality.HOST_LOCAL))
+ assert (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL))
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
if (speculatableTasks.size > 0) {
val localTask = speculatableTasks.find {
index =>
- val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL)
+ val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
val attemptLocs = taskAttempts(index).map(_.hostPort)
(locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
}
@@ -352,9 +352,9 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Dequeue a pending task for a given node and return its index.
// If localOnly is set to false, allow non-local tasks as well.
private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
- val instanceLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
- if (instanceLocalTask != None) {
- return instanceLocalTask
+ val processLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
+ if (processLocalTask != None) {
+ return processLocalTask
}
val localTask = findTaskFromList(getPendingTasksForHost(hostPort))
@@ -387,7 +387,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return findSpeculativeTask(hostPort, locality)
}
- private def isInstanceLocalLocation(task: Task[_], hostPort: String): Boolean = {
+ private def isProcessLocalLocation(task: Task[_], hostPort: String): Boolean = {
Utils.checkHostPort(hostPort)
val locs = task.preferredLocations
@@ -433,7 +433,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val locality = if (overrideLocality != null) overrideLocality else {
// expand only if we have waited for more than LOCALITY_WAIT for a host local task ...
val time = System.currentTimeMillis
- if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.HOST_LOCAL else TaskLocality.ANY
+ if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.NODE_LOCAL else TaskLocality.ANY
}
findTask(hostPort, locality) match {
@@ -443,8 +443,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
val taskLocality =
- if (isInstanceLocalLocation(task, hostPort)) TaskLocality.INSTANCE_LOCAL else
- if (isHostLocalLocation(task, hostPort)) TaskLocality.HOST_LOCAL else
+ if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL else
+ if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL else
if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else
TaskLocality.ANY
val prefStr = taskLocality.toString
@@ -456,7 +456,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
- if (TaskLocality.HOST_LOCAL == taskLocality) {
+ if (TaskLocality.NODE_LOCAL == taskLocality) {
lastPreferredLaunchTime = time
}
// Serialize and return the task
@@ -608,11 +608,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to
// no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc.
- // Note: NOT checking instance local list - since host local list is super set of that. We need to ad to no prefs only if
- // there is no host local node for the task (not if there is no instance local node for the task)
+ // Note: NOT checking process local list - since host local list is super set of that. We need to ad to no prefs only if
+ // there is no host local node for the task (not if there is no process local node for the task)
for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
// val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
- val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL)
+ val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
if (newLocs.isEmpty) {
pendingTasksWithNoPrefs += index
}
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index f060a940a9..53dd6fbe13 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -53,7 +53,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
def runTask(task: Task[_], idInJob: Int, attemptId: Int) {
logInfo("Running " + task)
- val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local:1", TaskLocality.HOST_LOCAL)
+ val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
try {