diff options
Diffstat (limited to 'core/src/main/scala/spark/deploy/master/Master.scala')
-rw-r--r-- | core/src/main/scala/spark/deploy/master/Master.scala | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index a21f156a51..89de3b1827 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -24,7 +24,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { val actorToWorker = new HashMap[ActorRef, WorkerInfo] val addressToWorker = new HashMap[Address, WorkerInfo] - val jobs = new HashSet[WorkerInfo] + val jobs = new HashSet[JobInfo] val idToJob = new HashMap[String, JobInfo] val actorToJob = new HashMap[ActorRef, JobInfo] val addressToJob = new HashMap[Address, JobInfo] @@ -57,7 +57,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { - val worker = addWorker(id, host, workerPort, cores, memory) + addWorker(id, host, workerPort, cores, memory) context.watch(sender) // This doesn't work with remote actors but helps for testing sender ! RegisteredWorker schedule() @@ -82,9 +82,11 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { exec.job.actor ! ExecutorUpdated(execId, state, message) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and job - logInfo("Removing executor " + exec.fullId) + logInfo("Removing executor " + exec.fullId + " because it is " + state) idToJob(jobId).executors -= exec.id exec.worker.removeExecutor(exec) + // TODO: the worker would probably want to restart the executor a few times + schedule() } } case None => @@ -120,10 +122,12 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { // Right now this is a very simple FIFO with backfilling. We keep looking through the jobs // in order of submission time and launching the first one that fits in the cluster. // It's also not very efficient in terms of algorithmic complexity. - for (job <- waitingJobs) { + for (job <- waitingJobs.clone()) { + logInfo("Trying to schedule job " + job.id) // Figure out how many cores the job could use on the whole cluster val jobMemory = job.desc.memoryPerSlave val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum + logInfo("jobMemory: " + jobMemory + ", usableCores: " + usableCores) if (usableCores >= job.desc.cores) { // We can launch it! Let's just partition the workers into executors for this job. // TODO: Probably want to spread stuff out across nodes more. @@ -134,6 +138,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { launchExecutor(worker, exec) coresLeft -= coresToUse } + waitingJobs -= job } } } @@ -141,12 +146,13 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) - worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc) + worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory) exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory) } def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int): WorkerInfo = { val worker = new WorkerInfo(id, host, port, cores, memory, sender) + workers += worker idToWorker(worker.id) = worker actorToWorker(sender) = worker addressToWorker(sender.path.address) = worker @@ -155,14 +161,20 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { def removeWorker(worker: WorkerInfo) { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) + workers -= worker idToWorker -= worker.id actorToWorker -= worker.actor addressToWorker -= worker.actor.path.address + for (exec <- worker.executors.values) { + exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None) + exec.job.executors -= exec.id + } } def addJob(desc: JobDescription, actor: ActorRef): JobInfo = { val date = new Date val job = new JobInfo(newJobId(date), desc, date, actor) + jobs += job idToJob(job.id) = job actorToJob(sender) = job addressToJob(sender.path.address) = job @@ -171,19 +183,21 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { def removeJob(job: JobInfo) { logInfo("Removing job " + job.id) + jobs -= job idToJob -= job.id actorToJob -= job.actor addressToWorker -= job.actor.path.address completedJobs += job // Remember it in our history for (exec <- job.executors.values) { - + exec.worker.removeExecutor(exec) + exec.worker.actor ! KillExecutor(exec.job.id, exec.id) } schedule() } /** Generate a new job ID given a job's submission date */ def newJobId(submitDate: Date): String = { - val jobId = "job-%s-%4d".format(DATE_FORMAT.format(submitDate), nextJobNumber) + val jobId = "job-%s-%04d".format(DATE_FORMAT.format(submitDate), nextJobNumber) nextJobNumber += 1 jobId } |