aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-10-16 11:57:36 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-10-16 11:57:36 -0700
commitbf21bb28f3f1fef1445f6413bb13d74888842906 (patch)
treefd0e2cbe4df1b42b9227e549e5ddb3fb2fa75a0e
parentc21f840a802b4e6b3289d57fc85db3a74d546192 (diff)
downloadspark-bf21bb28f3f1fef1445f6413bb13d74888842906.tar.gz
spark-bf21bb28f3f1fef1445f6413bb13d74888842906.tar.bz2
spark-bf21bb28f3f1fef1445f6413bb13d74888842906.zip
Further clarified some code
-rw-r--r--src/scala/spark/MesosScheduler.scala27
-rw-r--r--src/scala/spark/SimpleJob.scala5
2 files changed, 22 insertions, 10 deletions
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index 8a713d6f2b..5adff032eb 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -29,13 +29,13 @@ extends MScheduler with spark.Scheduler with Logging
)
// Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
+ private var isRegistered = false
+ private val registeredLock = new Object()
- var activeJobs = new HashMap[Int, Job]
- var activeJobsQueue = new Queue[Job]
+ private var activeJobs = new HashMap[Int, Job]
+ private var activeJobsQueue = new Queue[Job]
- private[spark] var taskIdToJobId = new HashMap[Int, Int]
+ private var taskIdToJobId = new HashMap[Int, Int]
private var nextJobId = 0
@@ -126,6 +126,11 @@ extends MScheduler with spark.Scheduler with Logging
}
}
+ /**
+ * Method called by Mesos to offer resources on slaves. We resond by asking
+ * our active jobs for tasks in FIFO order. We fill each node with tasks in
+ * a round-robin manner so that tasks are balanced across the cluster.
+ */
override def resourceOffer(
d: SchedulerDriver, oid: String, offers: JList[SlaveOffer]) {
synchronized {
@@ -159,6 +164,14 @@ extends MScheduler with spark.Scheduler with Logging
}
}
+ // Check whether a Mesos task state represents a finished task
+ def isFinished(state: TaskState) = {
+ state == TaskState.TASK_FINISHED ||
+ state == TaskState.TASK_FAILED ||
+ state == TaskState.TASK_KILLED ||
+ state == TaskState.TASK_LOST
+ }
+
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
synchronized {
try {
@@ -167,10 +180,12 @@ extends MScheduler with spark.Scheduler with Logging
if (activeJobs.contains(jobId)) {
activeJobs(jobId).statusUpdate(status)
}
+ if (isFinished(status.getState)) {
+ taskIdToJobId.remove(status.getTaskId)
+ }
case None =>
logInfo("TID " + status.getTaskId + " already finished")
}
-
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
diff --git a/src/scala/spark/SimpleJob.scala b/src/scala/spark/SimpleJob.scala
index faf3b1c492..b15d0522d4 100644
--- a/src/scala/spark/SimpleJob.scala
+++ b/src/scala/spark/SimpleJob.scala
@@ -9,7 +9,7 @@ import mesos._
/**
- * A simple implementation of Job that just runs each task in an array.
+ * A Job that runs a set of tasks with no interdependencies.
*/
class SimpleJob[T: ClassManifest](
sched: MesosScheduler, tasks: Array[Task[T]], val jobId: Int)
@@ -204,8 +204,6 @@ extends Job(jobId) with Logging
Accumulators.add(callingThread, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks
finished(index) = true
- // Remove TID -> jobId mapping from sched
- sched.taskIdToJobId.remove(tid)
if (tasksFinished == numTasks)
setAllFinished()
} else {
@@ -220,7 +218,6 @@ extends Job(jobId) with Logging
if (!finished(index)) {
logInfo("Lost TID %d (task %d:%d)".format(tid, jobId, index))
launched(index) = false
- sched.taskIdToJobId.remove(tid)
tasksLaunched -= 1
// Re-enqueue the task as pending
addPendingTask(index)