aboutsummaryrefslogtreecommitdiff
path: root/src/scala
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-10-16 10:38:56 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-10-16 10:38:56 -0700
commitdbdd7682eb4efc28062733f76a2bd5d04856a142 (patch)
tree073f206f99eef246e4b04b2e351d51d8543e33be /src/scala
parenta4953c5051e47069ec80bfa44f25ec45c0ab7b8f (diff)
downloadspark-dbdd7682eb4efc28062733f76a2bd5d04856a142.tar.gz
spark-dbdd7682eb4efc28062733f76a2bd5d04856a142.tar.bz2
spark-dbdd7682eb4efc28062733f76a2bd5d04856a142.zip
Bug fixes and improvements for MesosScheduler and SimpleJob
Diffstat (limited to 'src/scala')
-rw-r--r--src/scala/spark/Job.scala6
-rw-r--r--src/scala/spark/MesosScheduler.scala1
-rw-r--r--src/scala/spark/SimpleJob.scala64
3 files changed, 46 insertions, 25 deletions
diff --git a/src/scala/spark/Job.scala b/src/scala/spark/Job.scala
index 6b01307adc..6abbcbce51 100644
--- a/src/scala/spark/Job.scala
+++ b/src/scala/spark/Job.scala
@@ -3,14 +3,16 @@ package spark
import mesos._
/**
- * Trait representing a parallel job in MesosScheduler. Schedules the
+ * Class representing a parallel job in MesosScheduler. Schedules the
* job by implementing various callbacks.
*/
-trait Job {
+abstract class Job(jobId: Int) {
def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int)
: Option[TaskDescription]
def statusUpdate(t: TaskStatus): Unit
def error(code: Int, message: String): Unit
+
+ def getId(): Int = jobId
}
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index 87ebcb8692..8a713d6f2b 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -141,6 +141,7 @@ extends MScheduler with spark.Scheduler with Logging
job.slaveOffer(offers(i), availableCpus(i), availableMem(i)) match {
case Some(task) =>
tasks.add(task)
+ taskIdToJobId(task.getTaskId) = job.getId
availableCpus(i) -= task.getParams.get("cpus").toInt
availableMem(i) -= task.getParams.get("mem").toInt
launchedTask = true
diff --git a/src/scala/spark/SimpleJob.scala b/src/scala/spark/SimpleJob.scala
index a8544e4474..d0abaa4b67 100644
--- a/src/scala/spark/SimpleJob.scala
+++ b/src/scala/spark/SimpleJob.scala
@@ -2,8 +2,8 @@ package spark
import java.util.{HashMap => JHashMap}
+import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
import mesos._
@@ -13,7 +13,7 @@ import mesos._
*/
class SimpleJob[T: ClassManifest](
sched: MesosScheduler, tasks: Array[Task[T]], val jobId: Int)
-extends Job with Logging
+extends Job(jobId) with Logging
{
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
@@ -42,24 +42,34 @@ extends Job with Logging
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
- // Queue of pending tasks for each node
- val pendingTasksForNode = new HashMap[String, Queue[Int]]
+ // List of pending tasks for each node. These collections are actually
+ // treated as stacks, in which new tasks are added to the end of the
+ // ArrayBuffer and removed from the end. This makes it faster to detect
+ // tasks that repeatedly fail because whenever a task failed, it is put
+ // back at the head of the stack. They are also only cleaned up lazily;
+ // when a task is launched, it remains in all the pending lists except
+ // the one that it was launched from, but gets removed from them later.
+ val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
- // Queue containing all pending tasks
- val allPendingTasks = new Queue[Int]
+ // List containing all pending tasks (also used as a stack, as above)
+ val allPendingTasks = new ArrayBuffer[Int]
// Did the job fail?
var failed = false
var causeOfFailure = ""
- for (i <- 0 until numTasks) {
+ // Add all our tasks to the pending lists. We do this in reverse order
+ // of task index so that tasks with low indices get launched first.
+ for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
}
+ // Add a task to all the pending-task lists that it should be on.
def addPendingTask(index: Int) {
allPendingTasks += index
for (host <- tasks(index).preferredLocations) {
- pendingTasksForNode(host) += index
+ val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
+ list += index
}
}
@@ -85,15 +95,18 @@ extends Job with Logging
}
}
- def getPendingTasksForNode(host: String): Queue[Int] = {
- pendingTasksForNode.getOrElse(host, Queue())
+ // Return the pending tasks list for a given host, or an empty list if
+ // there is no map entry for that host
+ def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
+ pendingTasksForHost.getOrElse(host, ArrayBuffer())
}
- // Dequeue a pending task from the given queue and return its index.
- // Return None if the queue is empty.
- def findTaskFromQueue(queue: Queue[Int]): Option[Int] = {
- while (!queue.isEmpty) {
- val index = queue.dequeue
+ // Dequeue a pending task from the given list and return its index.
+ // Return None if the list is empty.
+ def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
+ while (!list.isEmpty) {
+ val index = list.last
+ list.trimEnd(1)
if (!launched(index) && !finished(index)) {
return Some(index)
}
@@ -104,19 +117,23 @@ extends Job with Logging
// Dequeue a pending task for a given node and return its index.
// If localOnly is set to false, allow non-local tasks as well.
def findTask(host: String, localOnly: Boolean): Option[Int] = {
- findTaskFromQueue(getPendingTasksForNode(host)) match {
+ findTaskFromList(getPendingTasksForHost(host)) match {
case Some(task) => Some(task)
case None =>
if (localOnly) None
- else findTaskFromQueue(allPendingTasks)
+ else findTaskFromList(allPendingTasks)
}
}
+ // Does a host count as a preferred location for a task? This is true if
+ // either the task has preferred locations and this host is one, or it has
+ // no preferred locations (in which we still count the launch as preferred).
def isPreferredLocation(task: Task[T], host: String): Boolean = {
val locs = task.preferredLocations
return (locs.contains(host) || locs.isEmpty)
}
+ // Respond to an offer of a single slave from the scheduler by finding a task
def slaveOffer(offer: SlaveOffer, availableCpus: Int, availableMem: Int)
: Option[TaskDescription] = {
if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK &&
@@ -126,9 +143,10 @@ extends Job with Logging
val host = offer.getHost
findTask(host, localOnly) match {
case Some(index) => {
+ // Found a task; do some bookkeeping and return a Mesos task for it
val task = tasks(index)
val taskId = sched.newTaskId()
- // Figure out whether the task's location is preferred
+ // Figure out whether this should count as a preferred launch
val preferred = isPreferredLocation(task, host)
val prefStr = if(preferred) "preferred" else "non-preferred"
val message =
@@ -136,7 +154,6 @@ extends Job with Logging
index, jobId, taskId, offer.getSlaveId, host, prefStr)
logInfo(message)
// Do various bookkeeping
- sched.taskIdToJobId(taskId) = jobId
tidToIndex(taskId) = index
task.markStarted(offer)
launched(index) = true
@@ -145,12 +162,13 @@ extends Job with Logging
lastPreferredLaunchTime = time
// Create and return the Mesos task object
val params = new JHashMap[String, String]
- params.put("cpus", "" + CPUS_PER_TASK)
- params.put("mem", "" + MEM_PER_TASK)
+ params.put("cpus", CPUS_PER_TASK.toString)
+ params.put("mem", MEM_PER_TASK.toString)
val serializedTask = Utils.serialize(task)
logDebug("Serialized size: " + serializedTask.size)
- return Some(new TaskDescription(taskId, offer.getSlaveId,
- "task_" + taskId, params, serializedTask))
+ val taskName = "task %d:%d".format(jobId, taskId)
+ return Some(new TaskDescription(
+ taskId, offer.getSlaveId, taskName, params, serializedTask))
}
case _ =>
}