aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/deploy/master/Master.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/deploy/master/Master.scala')
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala28
1 files changed, 21 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index a21f156a51..89de3b1827 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -24,7 +24,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val actorToWorker = new HashMap[ActorRef, WorkerInfo]
val addressToWorker = new HashMap[Address, WorkerInfo]
- val jobs = new HashSet[WorkerInfo]
+ val jobs = new HashSet[JobInfo]
val idToJob = new HashMap[String, JobInfo]
val actorToJob = new HashMap[ActorRef, JobInfo]
val addressToJob = new HashMap[Address, JobInfo]
@@ -57,7 +57,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- val worker = addWorker(id, host, workerPort, cores, memory)
+ addWorker(id, host, workerPort, cores, memory)
context.watch(sender) // This doesn't work with remote actors but helps for testing
sender ! RegisteredWorker
schedule()
@@ -82,9 +82,11 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
exec.job.actor ! ExecutorUpdated(execId, state, message)
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and job
- logInfo("Removing executor " + exec.fullId)
+ logInfo("Removing executor " + exec.fullId + " because it is " + state)
idToJob(jobId).executors -= exec.id
exec.worker.removeExecutor(exec)
+ // TODO: the worker would probably want to restart the executor a few times
+ schedule()
}
}
case None =>
@@ -120,10 +122,12 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
// Right now this is a very simple FIFO with backfilling. We keep looking through the jobs
// in order of submission time and launching the first one that fits in the cluster.
// It's also not very efficient in terms of algorithmic complexity.
- for (job <- waitingJobs) {
+ for (job <- waitingJobs.clone()) {
+ logInfo("Trying to schedule job " + job.id)
// Figure out how many cores the job could use on the whole cluster
val jobMemory = job.desc.memoryPerSlave
val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum
+ logInfo("jobMemory: " + jobMemory + ", usableCores: " + usableCores)
if (usableCores >= job.desc.cores) {
// We can launch it! Let's just partition the workers into executors for this job.
// TODO: Probably want to spread stuff out across nodes more.
@@ -134,6 +138,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
launchExecutor(worker, exec)
coresLeft -= coresToUse
}
+ waitingJobs -= job
}
}
}
@@ -141,12 +146,13 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
- worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc)
+ worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory)
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int): WorkerInfo = {
val worker = new WorkerInfo(id, host, port, cores, memory, sender)
+ workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
addressToWorker(sender.path.address) = worker
@@ -155,14 +161,20 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
+ workers -= worker
idToWorker -= worker.id
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
+ for (exec <- worker.executors.values) {
+ exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None)
+ exec.job.executors -= exec.id
+ }
}
def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
val date = new Date
val job = new JobInfo(newJobId(date), desc, date, actor)
+ jobs += job
idToJob(job.id) = job
actorToJob(sender) = job
addressToJob(sender.path.address) = job
@@ -171,19 +183,21 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
def removeJob(job: JobInfo) {
logInfo("Removing job " + job.id)
+ jobs -= job
idToJob -= job.id
actorToJob -= job.actor
addressToWorker -= job.actor.path.address
completedJobs += job // Remember it in our history
for (exec <- job.executors.values) {
-
+ exec.worker.removeExecutor(exec)
+ exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
}
schedule()
}
/** Generate a new job ID given a job's submission date */
def newJobId(submitDate: Date): String = {
- val jobId = "job-%s-%4d".format(DATE_FORMAT.format(submitDate), nextJobNumber)
+ val jobId = "job-%s-%04d".format(DATE_FORMAT.format(submitDate), nextJobNumber)
nextJobNumber += 1
jobId
}