aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-10-19 16:07:58 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-10-19 16:07:58 -0700
commit787faf0d0e288a64ec92f1013526a35525b78084 (patch)
tree8480099fc0dab9981e8d01ef8fab0be9ff177186 /src
parent0e0ec8357011f3daa86616138abea99a621f1913 (diff)
downloadspark-787faf0d0e288a64ec92f1013526a35525b78084.tar.gz
spark-787faf0d0e288a64ec92f1013526a35525b78084.tar.bz2
spark-787faf0d0e288a64ec92f1013526a35525b78084.zip
Fixed a bug with scheduling of tasks that have no locality preferences.
These tasks were being subjected to delay scheduling but then counted as having been launched on a preferred node. The solution is to have a separate queue for them and treat them as preferred during scheduling.
Diffstat (limited to 'src')
-rw-r--r--src/scala/spark/SimpleJob.scala35
1 files changed, 26 insertions, 9 deletions
diff --git a/src/scala/spark/SimpleJob.scala b/src/scala/spark/SimpleJob.scala
index b15d0522d4..09846ccc34 100644
--- a/src/scala/spark/SimpleJob.scala
+++ b/src/scala/spark/SimpleJob.scala
@@ -51,6 +51,9 @@ extends Job(jobId) with Logging
// the one that it was launched from, but gets removed from them later.
val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
+ // List containing pending tasks with no locality preferences
+ val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
+
// List containing all pending tasks (also used as a stack, as above)
val allPendingTasks = new ArrayBuffer[Int]
@@ -66,11 +69,16 @@ extends Job(jobId) with Logging
// Add a task to all the pending-task lists that it should be on.
def addPendingTask(index: Int) {
- allPendingTasks += index
- for (host <- tasks(index).preferredLocations) {
- val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
- list += index
+ val locations = tasks(index).preferredLocations
+ if (locations.size == 0) {
+ pendingTasksWithNoPrefs += index
+ } else {
+ for (host <- locations) {
+ val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
+ list += index
+ }
}
+ allPendingTasks += index
}
// Mark the job as finished and wake up any threads waiting on it
@@ -103,6 +111,8 @@ extends Job(jobId) with Logging
// Dequeue a pending task from the given list and return its index.
// Return None if the list is empty.
+ // This method also cleans up any tasks in the list that have already
+ // been launched, since we want that to happen lazily.
def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
while (!list.isEmpty) {
val index = list.last
@@ -117,11 +127,18 @@ extends Job(jobId) with Logging
// Dequeue a pending task for a given node and return its index.
// If localOnly is set to false, allow non-local tasks as well.
def findTask(host: String, localOnly: Boolean): Option[Int] = {
- findTaskFromList(getPendingTasksForHost(host)) match {
- case Some(task) => Some(task)
- case None =>
- if (localOnly) None
- else findTaskFromList(allPendingTasks)
+ val localTask = findTaskFromList(getPendingTasksForHost(host))
+ if (localTask != None) {
+ return localTask
+ }
+ val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs)
+ if (noPrefTask != None) {
+ return noPrefTask
+ }
+ if (!localOnly) {
+ return findTaskFromList(allPendingTasks) // Look for non-local task
+ } else {
+ return None
}
}