aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-10-16 12:11:19 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-10-16 12:11:19 -0700
commit7da569e8a5a388859d1b7d7a856c9386a54b0568 (patch)
tree395c6295fbbf653d3532c9c4a618cd7cd40ab0df /src
parentbf21bb28f3f1fef1445f6413bb13d74888842906 (diff)
downloadspark-7da569e8a5a388859d1b7d7a856c9386a54b0568.tar.gz
spark-7da569e8a5a388859d1b7d7a856c9386a54b0568.tar.bz2
spark-7da569e8a5a388859d1b7d7a856c9386a54b0568.zip
Keep track of tasks in each job so that they can be removed when the job exits
Diffstat (limited to 'src')
-rw-r--r--src/scala/spark/MesosScheduler.scala18
1 files changed, 12 insertions, 6 deletions
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index 5adff032eb..470be69e50 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -5,9 +5,10 @@ import java.util.{ArrayList => JArrayList}
import java.util.{List => JList}
import java.util.{HashMap => JHashMap}
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
import scala.collection.mutable.Map
import scala.collection.mutable.Queue
-import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import mesos.{Scheduler => MScheduler}
@@ -36,6 +37,7 @@ extends MScheduler with spark.Scheduler with Logging
private var activeJobsQueue = new Queue[Job]
private var taskIdToJobId = new HashMap[Int, Int]
+ private var jobTasks = new HashMap[Int, HashSet[Int]]
private var nextJobId = 0
@@ -95,18 +97,20 @@ extends MScheduler with spark.Scheduler with Logging
waitForRegister()
val jobId = newJobId()
val myJob = new SimpleJob(this, tasks, jobId)
-
try {
this.synchronized {
- this.activeJobs(myJob.jobId) = myJob
- this.activeJobsQueue += myJob
+ activeJobs(jobId) = myJob
+ activeJobsQueue += myJob
+ jobTasks(jobId) = new HashSet()
}
driver.reviveOffers();
return myJob.join();
} finally {
this.synchronized {
- this.activeJobs.remove(myJob.jobId)
- this.activeJobsQueue.dequeueAll(x => (x == myJob))
+ activeJobs -= jobId
+ activeJobsQueue.dequeueAll(x => (x == myJob))
+ taskIdToJobId --= jobTasks(jobId)
+ jobTasks.remove(jobId)
}
}
}
@@ -147,6 +151,7 @@ extends MScheduler with spark.Scheduler with Logging
case Some(task) =>
tasks.add(task)
taskIdToJobId(task.getTaskId) = job.getId
+ jobTasks(job.getId) += task.getTaskId
availableCpus(i) -= task.getParams.get("cpus").toInt
availableMem(i) -= task.getParams.get("mem").toInt
launchedTask = true
@@ -182,6 +187,7 @@ extends MScheduler with spark.Scheduler with Logging
}
if (isFinished(status.getState)) {
taskIdToJobId.remove(status.getTaskId)
+ jobTasks(jobId) -= status.getTaskId
}
case None =>
logInfo("TID " + status.getTaskId + " already finished")