diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-04-08 23:38:49 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-04-08 23:38:49 -0700 |
commit | d401e1b3e8644cf5973f71dad760de9b6918da07 (patch) | |
tree | 48c731ab5067bfd444ae9effe517c9127bf5c527 | |
parent | 816d4e58402c4242ff046a8678c94b72a7993e2d (diff) | |
download | spark-d401e1b3e8644cf5973f71dad760de9b6918da07.tar.gz spark-d401e1b3e8644cf5973f71dad760de9b6918da07.tar.bz2 spark-d401e1b3e8644cf5973f71dad760de9b6918da07.zip |
Fix a possible deadlock in MesosScheduler
-rw-r--r-- | core/src/main/scala/spark/MesosScheduler.scala | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index ee14d091ce..b95f40b877 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -54,7 +54,7 @@ private class MesosScheduler( private val registeredLock = new Object() private val activeJobs = new HashMap[Int, Job] - private var activeJobsQueue = new PriorityQueue[Job]()(jobOrdering) + private var activeJobsQueue = new ArrayBuffer[Job] private val taskIdToJobId = new HashMap[String, Int] private val taskIdToSlaveId = new HashMap[String, String] @@ -164,7 +164,7 @@ private class MesosScheduler( def jobFinished(job: Job) { this.synchronized { activeJobs -= job.jobId - activeJobsQueue = activeJobsQueue.filterNot(_ == job) + activeJobsQueue -= job taskIdToJobId --= jobTasks(job.jobId) taskIdToSlaveId --= jobTasks(job.jobId) jobTasks.remove(job.jobId) @@ -202,7 +202,7 @@ private class MesosScheduler( mem >= EXECUTOR_MEMORY || slavesWithExecutors.contains(slaveId) }) var launchedTask = false - for (job <- activeJobsQueue) { + for (job <- activeJobsQueue.sorted(jobOrdering)) { do { launchedTask = false for (i <- 0 until offers.size if enoughMem(i)) { @@ -248,6 +248,7 @@ private class MesosScheduler( } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + var jobToUpdate: Option[Job] = None synchronized { try { val tid = status.getTaskId.getValue @@ -259,7 +260,7 @@ private class MesosScheduler( taskIdToJobId.get(tid) match { case Some(jobId) => if (activeJobs.contains(jobId)) { - activeJobs(jobId).statusUpdate(status) + jobToUpdate = Some(activeJobs(jobId)) } if (isFinished(status.getState)) { taskIdToJobId.remove(tid) @@ -275,6 +276,9 @@ private class MesosScheduler( case e: Exception => logError("Exception in statusUpdate", e) } } + for (j <- jobToUpdate) { + j.statusUpdate(status) + } } override def error(d: SchedulerDriver, code: Int, message: String) { |