From 0aad42b5e732ac6865b8e3c2cffa35d4ff48d5ca Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Thu, 13 Dec 2012 20:33:57 -0800 Subject: Have standalone cluster report exit codes to clients. Addresses SPARK-639. --- core/src/main/scala/spark/deploy/DeployMessage.scala | 6 ++++-- core/src/main/scala/spark/deploy/client/Client.scala | 4 ++-- core/src/main/scala/spark/deploy/client/ClientListener.scala | 2 +- core/src/main/scala/spark/deploy/client/TestClient.scala | 2 +- core/src/main/scala/spark/deploy/master/Master.scala | 6 +++--- core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala | 7 ++++--- core/src/main/scala/spark/deploy/worker/Worker.scala | 6 +++--- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 11 ++--------- 8 files changed, 20 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index f05413a53b..457122745b 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -27,7 +27,8 @@ case class ExecutorStateChanged( jobId: String, execId: Int, state: ExecutorState, - message: Option[String]) + message: Option[String], + exitStatus: Option[Int]) extends DeployMessage // Master to Worker @@ -58,7 +59,8 @@ private[spark] case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) private[spark] -case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String]) +case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], + exitStatus: Option[Int]) private[spark] case class JobKilled(message: String) diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index c57a1d33e9..90fe9508cd 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -66,12 +66,12 @@ private[spark] class Client( logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores)) listener.executorAdded(fullId, workerId, host, cores, memory) - case ExecutorUpdated(id, state, message) => + case ExecutorUpdated(id, state, message, exitStatus) => val fullId = jobId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { - listener.executorRemoved(fullId, message.getOrElse("")) + listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) } case Terminated(actor_) if actor_ == master => diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala index a8fa982085..da6abcc9c2 100644 --- a/core/src/main/scala/spark/deploy/client/ClientListener.scala +++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala @@ -14,5 +14,5 @@ private[spark] trait ClientListener { def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit - def executorRemoved(id: String, message: String): Unit + def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit } diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala index 5b710f5520..57a7e123b7 100644 --- a/core/src/main/scala/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/spark/deploy/client/TestClient.scala @@ -18,7 +18,7 @@ private[spark] object TestClient { def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {} - def executorRemoved(id: String, message: String) {} + def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {} } def main(args: Array[String]) { diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 31fb83f2e2..b30c8e99b5 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -83,12 +83,12 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor schedule() } - case ExecutorStateChanged(jobId, execId, state, message) => { + case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => { val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId)) execOption match { case Some(exec) => { exec.state = state - exec.job.actor ! ExecutorUpdated(execId, state, message) + exec.job.actor ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { val jobInfo = idToJob(jobId) // Remove this executor from the worker and job @@ -218,7 +218,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor actorToWorker -= worker.actor addressToWorker -= worker.actor.path.address for (exec <- worker.executors.values) { - exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None) + exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None) exec.job.executors -= exec.id } } diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 07ae7bca78..beceb55ecd 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -60,7 +60,7 @@ private[spark] class ExecutorRunner( process.destroy() process.waitFor() } - worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None) + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None, None) Runtime.getRuntime.removeShutdownHook(shutdownHook) } } @@ -134,7 +134,8 @@ private[spark] class ExecutorRunner( // times on the same machine. val exitCode = process.waitFor() val message = "Command exited with code " + exitCode - worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message)) + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), + Some(exitCode)) } catch { case interrupted: InterruptedException => logInfo("Runner thread for executor " + fullId + " interrupted") @@ -145,7 +146,7 @@ private[spark] class ExecutorRunner( process.destroy() } val message = e.getClass + ": " + e.getMessage - worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message)) + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), None) } } } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 31b8f0f955..7c9e588ea2 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -127,10 +127,10 @@ private[spark] class Worker( manager.start() coresUsed += cores_ memoryUsed += memory_ - master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None) + master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None, None) - case ExecutorStateChanged(jobId, execId, state, message) => - master ! ExecutorStateChanged(jobId, execId, state, message) + case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => + master ! ExecutorStateChanged(jobId, execId, state, message, exitStatus) val fullId = jobId + "/" + execId if (ExecutorState.isFinished(state)) { val executor = executors(fullId) diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index efaf2d330c..7b58d0c022 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -71,15 +71,8 @@ private[spark] class SparkDeploySchedulerBackend( id, host, cores, Utils.memoryMegabytesToString(memory))) } - def executorRemoved(id: String, message: String) { - var reason: ExecutorLossReason = SlaveLost(message) - if (message.startsWith("Command exited with code ")) { - try { - reason = ExecutorExited(message.substring("Command exited with code ".length).toInt) - } catch { - case nfe: NumberFormatException => {} - } - } + def executorRemoved(id: String, message: String, exitStatus: Option[Int]) { + var reason: ExecutorLossReason = exitStatus.map(ExecutorExited).getOrElse(SlaveLost(message)) logInfo("Executor %s removed: %s".format(id, message)) executorIdToSlaveId.get(id) match { case Some(slaveId) => -- cgit v1.2.3 From c528932a41000835af316382309a1465cb94f582 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Thu, 13 Dec 2012 21:51:47 -0800 Subject: Code review cleanup. --- .../scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 7b58d0c022..e2301347e5 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -72,7 +72,10 @@ private[spark] class SparkDeploySchedulerBackend( } def executorRemoved(id: String, message: String, exitStatus: Option[Int]) { - var reason: ExecutorLossReason = exitStatus.map(ExecutorExited).getOrElse(SlaveLost(message)) + val reason: ExecutorLossReason = exitStatus match { + case Some(code) => ExecutorExited(code) + case None => SlaveLost(message) + } logInfo("Executor %s removed: %s".format(id, message)) executorIdToSlaveId.get(id) match { case Some(slaveId) => -- cgit v1.2.3