aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-04-08 23:38:49 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-04-08 23:38:49 -0700
commitd401e1b3e8644cf5973f71dad760de9b6918da07 (patch)
tree48c731ab5067bfd444ae9effe517c9127bf5c527 /core
parent816d4e58402c4242ff046a8678c94b72a7993e2d (diff)
downloadspark-d401e1b3e8644cf5973f71dad760de9b6918da07.tar.gz
spark-d401e1b3e8644cf5973f71dad760de9b6918da07.tar.bz2
spark-d401e1b3e8644cf5973f71dad760de9b6918da07.zip
Fix a possible deadlock in MesosScheduler
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index ee14d091ce..b95f40b877 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -54,7 +54,7 @@ private class MesosScheduler(
private val registeredLock = new Object()
private val activeJobs = new HashMap[Int, Job]
- private var activeJobsQueue = new PriorityQueue[Job]()(jobOrdering)
+ private var activeJobsQueue = new ArrayBuffer[Job]
private val taskIdToJobId = new HashMap[String, Int]
private val taskIdToSlaveId = new HashMap[String, String]
@@ -164,7 +164,7 @@ private class MesosScheduler(
def jobFinished(job: Job) {
this.synchronized {
activeJobs -= job.jobId
- activeJobsQueue = activeJobsQueue.filterNot(_ == job)
+ activeJobsQueue -= job
taskIdToJobId --= jobTasks(job.jobId)
taskIdToSlaveId --= jobTasks(job.jobId)
jobTasks.remove(job.jobId)
@@ -202,7 +202,7 @@ private class MesosScheduler(
mem >= EXECUTOR_MEMORY || slavesWithExecutors.contains(slaveId)
})
var launchedTask = false
- for (job <- activeJobsQueue) {
+ for (job <- activeJobsQueue.sorted(jobOrdering)) {
do {
launchedTask = false
for (i <- 0 until offers.size if enoughMem(i)) {
@@ -248,6 +248,7 @@ private class MesosScheduler(
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ var jobToUpdate: Option[Job] = None
synchronized {
try {
val tid = status.getTaskId.getValue
@@ -259,7 +260,7 @@ private class MesosScheduler(
taskIdToJobId.get(tid) match {
case Some(jobId) =>
if (activeJobs.contains(jobId)) {
- activeJobs(jobId).statusUpdate(status)
+ jobToUpdate = Some(activeJobs(jobId))
}
if (isFinished(status.getState)) {
taskIdToJobId.remove(tid)
@@ -275,6 +276,9 @@ private class MesosScheduler(
case e: Exception => logError("Exception in statusUpdate", e)
}
}
+ for (j <- jobToUpdate) {
+ j.statusUpdate(status)
+ }
}
override def error(d: SchedulerDriver, code: Int, message: String) {