diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-10-19 16:07:58 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-10-19 16:07:58 -0700 |
commit | 787faf0d0e288a64ec92f1013526a35525b78084 (patch) | |
tree | 8480099fc0dab9981e8d01ef8fab0be9ff177186 /src | |
parent | 0e0ec8357011f3daa86616138abea99a621f1913 (diff) | |
download | spark-787faf0d0e288a64ec92f1013526a35525b78084.tar.gz spark-787faf0d0e288a64ec92f1013526a35525b78084.tar.bz2 spark-787faf0d0e288a64ec92f1013526a35525b78084.zip |
Fixed a bug with scheduling of tasks that have no locality preferences.
These tasks were being subjected to delay scheduling but then counted as
having been launched on a preferred node. The solution is to have a
separate queue for them and treat them as preferred during scheduling.
Diffstat (limited to 'src')
-rw-r--r-- | src/scala/spark/SimpleJob.scala | 35 |
1 files changed, 26 insertions, 9 deletions
diff --git a/src/scala/spark/SimpleJob.scala b/src/scala/spark/SimpleJob.scala index b15d0522d4..09846ccc34 100644 --- a/src/scala/spark/SimpleJob.scala +++ b/src/scala/spark/SimpleJob.scala @@ -51,6 +51,9 @@ extends Job(jobId) with Logging // the one that it was launched from, but gets removed from them later. val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] + // List containing pending tasks with no locality preferences + val pendingTasksWithNoPrefs = new ArrayBuffer[Int] + // List containing all pending tasks (also used as a stack, as above) val allPendingTasks = new ArrayBuffer[Int] @@ -66,11 +69,16 @@ extends Job(jobId) with Logging // Add a task to all the pending-task lists that it should be on. def addPendingTask(index: Int) { - allPendingTasks += index - for (host <- tasks(index).preferredLocations) { - val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer()) - list += index + val locations = tasks(index).preferredLocations + if (locations.size == 0) { + pendingTasksWithNoPrefs += index + } else { + for (host <- locations) { + val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer()) + list += index + } } + allPendingTasks += index } // Mark the job as finished and wake up any threads waiting on it @@ -103,6 +111,8 @@ extends Job(jobId) with Logging // Dequeue a pending task from the given list and return its index. // Return None if the list is empty. + // This method also cleans up any tasks in the list that have already + // been launched, since we want that to happen lazily. def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = { while (!list.isEmpty) { val index = list.last @@ -117,11 +127,18 @@ extends Job(jobId) with Logging // Dequeue a pending task for a given node and return its index. // If localOnly is set to false, allow non-local tasks as well. def findTask(host: String, localOnly: Boolean): Option[Int] = { - findTaskFromList(getPendingTasksForHost(host)) match { - case Some(task) => Some(task) - case None => - if (localOnly) None - else findTaskFromList(allPendingTasks) + val localTask = findTaskFromList(getPendingTasksForHost(host)) + if (localTask != None) { + return localTask + } + val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs) + if (noPrefTask != None) { + return noPrefTask + } + if (!localOnly) { + return findTaskFromList(allPendingTasks) // Look for non-local task + } else { + return None } } |