diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-01 17:13:31 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-01 17:13:31 -0700 |
commit | 5d1a887bed8423bd6c25660910d18d91880e01fe (patch) | |
tree | 23870ee515630db13350203409081c0e9bc4bf7a /core | |
parent | 51c46eaca0b4540a39ba78d0809a30d9b118e629 (diff) | |
download | spark-5d1a887bed8423bd6c25660910d18d91880e01fe.tar.gz spark-5d1a887bed8423bd6c25660910d18d91880e01fe.tar.bz2 spark-5d1a887bed8423bd6c25660910d18d91880e01fe.zip |
Further updates to run processes on cluster.
Diffstat (limited to 'core')
11 files changed, 298 insertions, 33 deletions
diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/PipedRDD.scala index 9e0a01b5f9..4fcfd869cf 100644 --- a/core/src/main/scala/spark/PipedRDD.scala +++ b/core/src/main/scala/spark/PipedRDD.scala @@ -30,7 +30,7 @@ class PipedRDD[T: ClassManifest]( val pb = new ProcessBuilder(command) // Add the environmental variables to the process. val currentEnvVars = pb.environment() - envVars.foreach { case(variable, value) => currentEnvVars.put(variable, value) } + envVars.foreach { case (variable, value) => currentEnvVars.put(variable, value) } val proc = pb.start() val env = SparkEnv.get @@ -38,7 +38,7 @@ class PipedRDD[T: ClassManifest]( // Start a thread to print the process's stderr to ours new Thread("stderr reader for " + command) { override def run() { - for(line <- Source.fromInputStream(proc.getErrorStream).getLines) { + for (line <- Source.fromInputStream(proc.getErrorStream).getLines) { System.err.println(line) } } @@ -49,7 +49,7 @@ class PipedRDD[T: ClassManifest]( override def run() { SparkEnv.set(env) val out = new PrintWriter(proc.getOutputStream) - for(elem <- parent.iterator(split)) { + for (elem <- parent.iterator(split)) { out.println(elem) } out.close() @@ -66,8 +66,9 @@ object PipedRDD { def tokenize(command: String): Seq[String] = { val buf = new ArrayBuffer[String] val tok = new StringTokenizer(command) - while(tok.hasMoreElements) + while(tok.hasMoreElements) { buf += tok.nextToken() + } buf } } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 95c833ea5b..674ff9e298 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -7,6 +7,7 @@ import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} import scala.collection.mutable.ArrayBuffer import scala.util.Random import java.util.{Locale, UUID} +import scala.io.Source /** * Various utility methods used by Spark. @@ -39,6 +40,7 @@ object Utils { (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') } + /** Split a string into words at non-alphabetic characters */ def splitWords(s: String): Seq[String] = { val buf = new ArrayBuffer[String] var i = 0 @@ -58,7 +60,7 @@ object Utils { return buf } - // Create a temporary directory inside the given parent directory + /** Create a temporary directory inside the given parent directory */ def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { var attempts = 0 val maxAttempts = 10 @@ -85,7 +87,7 @@ object Utils { return dir } - // Copy all data from an InputStream to an OutputStream + /** Copy all data from an InputStream to an OutputStream */ def copyStream(in: InputStream, out: OutputStream, closeStreams: Boolean = false) @@ -104,9 +106,11 @@ object Utils { } } - // Shuffle the elements of a collection into a random order, returning the - // result in a new collection. Unlike scala.util.Random.shuffle, this method - // uses a local random number generator, avoiding inter-thread contention. + /** + * Shuffle the elements of a collection into a random order, returning the + * result in a new collection. Unlike scala.util.Random.shuffle, this method + * uses a local random number generator, avoiding inter-thread contention. + */ def randomize[T](seq: TraversableOnce[T]): Seq[T] = { val buf = new ArrayBuffer[T]() buf ++= seq @@ -136,7 +140,7 @@ object Utils { } /** - * Get the local machine's hostname + * Get the local machine's hostname. */ def localHostName(): String = { customHostname.getOrElse(InetAddress.getLocalHost.getHostName) @@ -250,4 +254,34 @@ object Utils { def memoryMegabytesToString(megabytes: Long): String = { memoryBytesToString(megabytes * 1024L * 1024L) } + + /** + * Execute a command in the given working directory, throwing an exception if it completes + * with an exit code other than 0. + */ + def execute(command: Seq[String], workingDir: File) { + val process = new ProcessBuilder(command: _*) + .directory(workingDir) + .redirectErrorStream(true) + .start() + new Thread("read stdout for " + command(0)) { + override def run() { + for (line <- Source.fromInputStream(process.getInputStream).getLines) { + System.err.println(line) + } + } + }.start() + val exitCode = process.waitFor() + if (exitCode != 0) { + throw new SparkException("Process " + command + " exited with code " + exitCode) + } + } + + /** + * Execute a command in the current working directory, throwing an exception if it completes + * with an exit code other than 0. + */ + def execute(command: Seq[String]) { + execute(command, new File(".")) + } } diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index c63f542bb0..14492ed552 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -6,14 +6,29 @@ sealed trait DeployMessage extends Serializable case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int) extends DeployMessage -case class ExecutorStateChanged(jobId: String, execId: Int, state: ExecutorState.Value, message: String) + +case class ExecutorStateChanged( + jobId: String, + execId: Int, + state: + ExecutorState.Value, + message: Option[String]) extends DeployMessage // Master to Worker case object RegisteredWorker extends DeployMessage case class RegisterWorkerFailed(message: String) extends DeployMessage -case class LaunchExecutor(jobId: String, execId: Int, jobDesc: JobDescription) extends DeployMessage +case class KillExecutor(jobId: String, execId: Int) extends DeployMessage + +case class LaunchExecutor( + jobId: String, + execId: Int, + jobDesc: JobDescription, + cores: Int, + memory: Int) + extends DeployMessage + // Client to Master @@ -23,7 +38,8 @@ case class RegisterJob(jobDescription: JobDescription) extends DeployMessage case class RegisteredJob(jobId: String) extends DeployMessage case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) -case class ExecutorUpdated(id: Int, state: ExecutorState.Value, message: String) +case class ExecutorUpdated(id: Int, state: ExecutorState.Value, message: Option[String]) +case class JobKilled(message: String) // Internal message in Client diff --git a/core/src/main/scala/spark/deploy/ExecutorState.scala b/core/src/main/scala/spark/deploy/ExecutorState.scala index f26609d8c9..ea73f7be29 100644 --- a/core/src/main/scala/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/spark/deploy/ExecutorState.scala @@ -1,7 +1,9 @@ package spark.deploy -object ExecutorState extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED") { - val LAUNCHING, RUNNING, FINISHED, FAILED = Value +object ExecutorState + extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") { - def isFinished(state: Value): Boolean = (state == FINISHED || state == FAILED) + val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value + + def isFinished(state: Value): Boolean = (state == KILLED || state == FAILED || state == LOST) } diff --git a/core/src/main/scala/spark/deploy/JobDescription.scala b/core/src/main/scala/spark/deploy/JobDescription.scala index 545bcdcc74..3f91402a31 100644 --- a/core/src/main/scala/spark/deploy/JobDescription.scala +++ b/core/src/main/scala/spark/deploy/JobDescription.scala @@ -2,11 +2,13 @@ package spark.deploy class JobDescription( val name: String, - val memoryPerSlave: Int, val cores: Int, - val resources: Seq[String], + val memoryPerSlave: Int, + val fileUrls: Seq[String], val command: Command) extends Serializable { val user = System.getProperty("user.name", "<unknown>") + + override def toString: String = "JobDescription(" + name + ")" } diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index f894804993..c7fa8a3874 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -66,10 +66,10 @@ class Client( case ExecutorUpdated(id, state, message) => val fullId = jobId + "/" + id - val messageText = if (message == null) "" else " (" + message + ")" + val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { - listener.executorRemoved(fullId, message) + listener.executorRemoved(fullId, message.getOrElse("")) } case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala index e6d76f2751..b04f362997 100644 --- a/core/src/main/scala/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/spark/deploy/client/TestClient.scala @@ -24,7 +24,7 @@ object TestClient { def main(args: Array[String]) { val url = args(0) val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress(), 0) - val desc = new JobDescription("TestClient", 200, 1, Seq(), + val desc = new JobDescription("TestClient", 1, 512, Seq(), Command("spark.deploy.client.TestExecutor", Seq(), Map())) val listener = new TestListener val client = new Client(actorSystem, url, desc, listener) diff --git a/core/src/main/scala/spark/deploy/client/TestExecutor.scala b/core/src/main/scala/spark/deploy/client/TestExecutor.scala new file mode 100644 index 0000000000..1a74cc03cf --- /dev/null +++ b/core/src/main/scala/spark/deploy/client/TestExecutor.scala @@ -0,0 +1,7 @@ +package spark.deploy.client + +object TestExecutor { + def main(args: Array[String]) { + println("Hello world!") + } +} diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index a21f156a51..89de3b1827 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -24,7 +24,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { val actorToWorker = new HashMap[ActorRef, WorkerInfo] val addressToWorker = new HashMap[Address, WorkerInfo] - val jobs = new HashSet[WorkerInfo] + val jobs = new HashSet[JobInfo] val idToJob = new HashMap[String, JobInfo] val actorToJob = new HashMap[ActorRef, JobInfo] val addressToJob = new HashMap[Address, JobInfo] @@ -57,7 +57,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { - val worker = addWorker(id, host, workerPort, cores, memory) + addWorker(id, host, workerPort, cores, memory) context.watch(sender) // This doesn't work with remote actors but helps for testing sender ! RegisteredWorker schedule() @@ -82,9 +82,11 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { exec.job.actor ! ExecutorUpdated(execId, state, message) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and job - logInfo("Removing executor " + exec.fullId) + logInfo("Removing executor " + exec.fullId + " because it is " + state) idToJob(jobId).executors -= exec.id exec.worker.removeExecutor(exec) + // TODO: the worker would probably want to restart the executor a few times + schedule() } } case None => @@ -120,10 +122,12 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { // Right now this is a very simple FIFO with backfilling. We keep looking through the jobs // in order of submission time and launching the first one that fits in the cluster. // It's also not very efficient in terms of algorithmic complexity. - for (job <- waitingJobs) { + for (job <- waitingJobs.clone()) { + logInfo("Trying to schedule job " + job.id) // Figure out how many cores the job could use on the whole cluster val jobMemory = job.desc.memoryPerSlave val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum + logInfo("jobMemory: " + jobMemory + ", usableCores: " + usableCores) if (usableCores >= job.desc.cores) { // We can launch it! Let's just partition the workers into executors for this job. // TODO: Probably want to spread stuff out across nodes more. @@ -134,6 +138,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { launchExecutor(worker, exec) coresLeft -= coresToUse } + waitingJobs -= job } } } @@ -141,12 +146,13 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) - worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc) + worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory) exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory) } def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int): WorkerInfo = { val worker = new WorkerInfo(id, host, port, cores, memory, sender) + workers += worker idToWorker(worker.id) = worker actorToWorker(sender) = worker addressToWorker(sender.path.address) = worker @@ -155,14 +161,20 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { def removeWorker(worker: WorkerInfo) { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) + workers -= worker idToWorker -= worker.id actorToWorker -= worker.actor addressToWorker -= worker.actor.path.address + for (exec <- worker.executors.values) { + exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None) + exec.job.executors -= exec.id + } } def addJob(desc: JobDescription, actor: ActorRef): JobInfo = { val date = new Date val job = new JobInfo(newJobId(date), desc, date, actor) + jobs += job idToJob(job.id) = job actorToJob(sender) = job addressToJob(sender.path.address) = job @@ -171,19 +183,21 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { def removeJob(job: JobInfo) { logInfo("Removing job " + job.id) + jobs -= job idToJob -= job.id actorToJob -= job.actor addressToWorker -= job.actor.path.address completedJobs += job // Remember it in our history for (exec <- job.executors.values) { - + exec.worker.removeExecutor(exec) + exec.worker.actor ! KillExecutor(exec.job.id, exec.id) } schedule() } /** Generate a new job ID given a job's submission date */ def newJobId(submitDate: Date): String = { - val jobId = "job-%s-%4d".format(DATE_FORMAT.format(submitDate), nextJobNumber) + val jobId = "job-%s-%04d".format(DATE_FORMAT.format(submitDate), nextJobNumber) nextJobNumber += 1 jobId } diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala new file mode 100644 index 0000000000..ec58f576e7 --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -0,0 +1,146 @@ +package spark.deploy.worker + +import java.io._ +import spark.deploy.{ExecutorState, ExecutorStateChanged, JobDescription} +import akka.actor.ActorRef +import spark.{Utils, Logging} +import java.net.{URI, URL} +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.conf.Configuration +import scala.Some +import spark.deploy.ExecutorStateChanged + +class ExecutorRunner( + jobId: String, + execId: Int, + jobDesc: JobDescription, + cores: Int, + memory: Int, + worker: ActorRef, + sparkHome: File, + workDir: File) + extends Logging { + + val fullId = jobId + "/" + execId + var workerThread: Thread = null + var process: Process = null + + def start() { + workerThread = new Thread("ExecutorRunner for " + fullId) { + override def run() { fetchAndRunExecutor() } + } + workerThread.start() + } + + /** Stop this executor runner, including killing the process it launched */ + def kill() { + if (workerThread != null) { + workerThread.interrupt() + workerThread = null + } + } + + /** + * Download a file requested by the executor. Supports fetching the file in a variety of ways, + * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. + */ + def fetchFile(url: String, targetDir: File) { + val filename = url.split("/").last + val targetFile = new File(targetDir, filename) + if (url.startsWith("http://") || url.startsWith("https://") || url.startsWith("ftp://")) { + // Use the java.net library to fetch it + logInfo("Fetching " + url + " to " + targetFile) + val in = new URL(url).openStream() + val out = new FileOutputStream(targetFile) + Utils.copyStream(in, out, true) + } else { + // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others + val uri = new URI(url) + val conf = new Configuration() + val fs = FileSystem.get(uri, conf) + val in = fs.open(new Path(uri)) + val out = new FileOutputStream(targetFile) + Utils.copyStream(in, out, true) + } + // Decompress the file if it's a .tar or .tar.gz + if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) { + logInfo("Untarring " + filename) + Utils.execute(Seq("tar", "-xzf", filename), targetDir) + } else if (filename.endsWith(".tar")) { + logInfo("Untarring " + filename) + Utils.execute(Seq("tar", "-xf", filename), targetDir) + } + } + + def buildCommandSeq(): Seq[String] = { + val command = jobDesc.command + val runScript = new File(sparkHome, "run").getCanonicalPath + Seq(runScript, command.mainClass) ++ command.arguments + } + + /** Spawn a thread that will redirect a given stream to a file */ + def redirectStream(in: InputStream, file: File) { + val out = new FileOutputStream(file) + new Thread("redirect output to " + file) { + override def run() { + Utils.copyStream(in, out, true) + } + }.start() + } + + /** + * Download and run the executor described in our JobDescription + */ + def fetchAndRunExecutor() { + try { + // Create the executor's working directory + val executorDir = new File(workDir, jobId + "/" + execId) + if (!executorDir.mkdirs()) { + throw new IOException("Failed to create directory " + executorDir) + } + + // Download the files it depends on into it (disabled for now) + //for (url <- jobDesc.fileUrls) { + // fetchFile(url, executorDir) + //} + + // Launch the process + val command = buildCommandSeq() + val builder = new ProcessBuilder(command: _*).directory(executorDir) + val env = builder.environment() + for ((key, value) <- jobDesc.command.environment) { + env.put(key, value) + } + env.put("SPARK_CORES", cores.toString) + env.put("SPARK_MEMORY", memory.toString) + process = builder.start() + + // Redirect its stdout and stderr to files + redirectStream(process.getInputStream, new File(executorDir, "stdout")) + redirectStream(process.getErrorStream, new File(executorDir, "stderr")) + + // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run + // long-lived processes only. However, in the future, we might restart the executor a few + // times on the same machine. + val exitCode = process.waitFor() + val message = "Command exited with code " + exitCode + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message)) + } catch { + case interrupted: InterruptedException => + logInfo("Runner thread interrupted -- killing executor " + fullId) + if (process != null) { + process.destroy() + } + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None) + + case e: Exception => { + logError("Error running executor", e) + if (process != null) { + process.destroy() + } + val message = e.getClass + ": " + e.getMessage + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message)) + } + } + } +} diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 1f5854011f..e5da181e9a 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -1,13 +1,20 @@ package spark.deploy.worker - -import akka.actor.{ActorRef, Terminated, Props, Actor} +import scala.collection.mutable.{ArrayBuffer, HashMap} +import akka.actor.{ActorRef, Props, Actor} import spark.{Logging, Utils} import spark.util.AkkaUtils -import spark.deploy.{RegisterWorkerFailed, RegisterWorker, RegisteredWorker} -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import spark.deploy._ +import akka.remote.RemoteClientLifeCycleEvent import java.text.SimpleDateFormat import java.util.Date +import akka.remote.RemoteClientShutdown +import akka.remote.RemoteClientDisconnected +import spark.deploy.RegisterWorker +import spark.deploy.LaunchExecutor +import spark.deploy.RegisterWorkerFailed +import akka.actor.Terminated +import java.io.File class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrl: String) extends Actor with Logging { @@ -16,8 +23,11 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r var master: ActorRef = null - val workerId = generateWorkerId() + var sparkHome: File = null + var workDir: File = null + val executors = new HashMap[String, ExecutorRunner] + val finishedExecutors = new ArrayBuffer[String] var coresUsed = 0 var memoryUsed = 0 @@ -25,9 +35,27 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed + def createWorkDir() { + workDir = new File(sparkHome, "work") + try { + if (!workDir.exists() && !workDir.mkdirs()) { + logError("Failed to create work directory " + workDir) + System.exit(1) + } + } catch { + case e: Exception => + logError("Failed to create work directory " + workDir, e) + System.exit(1) + } + } + override def preStart() { logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( ip, port, cores, Utils.memoryMegabytesToString(memory))) + val envVar = System.getenv("SPARK_HOME") + sparkHome = new File(if (envVar == null) "." else envVar) + logInfo("Spark home: " + sparkHome) + createWorkDir() connectToMaster() startWebUi() } @@ -74,6 +102,21 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas logError("Worker registration failed: " + message) System.exit(1) + case LaunchExecutor(jobId, execId, jobDesc, cores_, memory_) => + logInfo("Asked to launch executor %s/%d for %s".format(jobId, execId, jobDesc.name)) + val er = new ExecutorRunner(jobId, execId, jobDesc, cores_, memory_, self, sparkHome, workDir) + executors(jobId + "/" + execId) = er + er.start() + master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None) + + case ExecutorStateChanged(jobId, execId, state, message) => + master ! ExecutorStateChanged(jobId, execId, state, message) + if (ExecutorState.isFinished(state)) { + logInfo("Executor " + jobId + "/" + execId + " finished with state " + state) + executors -= jobId + "/" + execId + finishedExecutors += jobId + "/" + execId + } + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => masterDisconnected() } |