aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-04-08 23:39:37 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-04-08 23:39:37 -0700
commit0229d5390f3fe6a84a744feb23ea6a9a458ebe1b (patch)
tree54ca991ad5cee0716813992c4b6edcf7a4e85bd1
parenta8bb324ed9d4c6fa1dd73687f471373dde49378f (diff)
parentd401e1b3e8644cf5973f71dad760de9b6918da07 (diff)
downloadspark-0229d5390f3fe6a84a744feb23ea6a9a458ebe1b.tar.gz
spark-0229d5390f3fe6a84a744feb23ea6a9a458ebe1b.tar.bz2
spark-0229d5390f3fe6a84a744feb23ea6a9a458ebe1b.zip
Merge branch 'master' into mesos-0.9
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index 6543e1112a..391e5f1714 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -54,7 +54,7 @@ private class MesosScheduler(
private val registeredLock = new Object()
private val activeJobs = new HashMap[Int, Job]
- private var activeJobsQueue = new PriorityQueue[Job]()(jobOrdering)
+ private var activeJobsQueue = new ArrayBuffer[Job]
private val taskIdToJobId = new HashMap[String, Int]
private val taskIdToSlaveId = new HashMap[String, String]
@@ -171,7 +171,7 @@ private class MesosScheduler(
def jobFinished(job: Job) {
this.synchronized {
activeJobs -= job.jobId
- activeJobsQueue = activeJobsQueue.filterNot(_ == job)
+ activeJobsQueue -= job
taskIdToJobId --= jobTasks(job.jobId)
taskIdToSlaveId --= jobTasks(job.jobId)
jobTasks.remove(job.jobId)
@@ -213,7 +213,7 @@ private class MesosScheduler(
mem >= EXECUTOR_MEMORY || slavesWithExecutors.contains(slaveId)
})
var launchedTask = false
- for (job <- activeJobsQueue) {
+ for (job <- activeJobsQueue.sorted(jobOrdering)) {
do {
launchedTask = false
for (i <- 0 until offers.size if enoughMem(i)) {
@@ -259,6 +259,7 @@ private class MesosScheduler(
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ var jobToUpdate: Option[Job] = None
synchronized {
try {
val tid = status.getTaskId.getValue
@@ -270,7 +271,7 @@ private class MesosScheduler(
taskIdToJobId.get(tid) match {
case Some(jobId) =>
if (activeJobs.contains(jobId)) {
- activeJobs(jobId).statusUpdate(status)
+ jobToUpdate = Some(activeJobs(jobId))
}
if (isFinished(status.getState)) {
taskIdToJobId.remove(tid)
@@ -286,6 +287,9 @@ private class MesosScheduler(
case e: Exception => logError("Exception in statusUpdate", e)
}
}
+ for (j <- jobToUpdate) {
+ j.statusUpdate(status)
+ }
}
override def error(d: SchedulerDriver, message: String) {