aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-12-13 22:43:48 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-12-13 22:43:48 -0800
commit1072f970cc8da29e3ebabf746e1016a8c6e9fa7e (patch)
treea9676430169fa7d646f91c603ffcd435edf727ed
parentd6d910471d103b17ef5701da6a34f2f8085bf99e (diff)
parentc528932a41000835af316382309a1465cb94f582 (diff)
downloadspark-1072f970cc8da29e3ebabf746e1016a8c6e9fa7e.tar.gz
spark-1072f970cc8da29e3ebabf746e1016a8c6e9fa7e.tar.bz2
spark-1072f970cc8da29e3ebabf746e1016a8c6e9fa7e.zip
Merge pull request #331 from woggling/deploy-exit-status
Have standalone cluster report exit codes to clients
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala6
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala4
-rw-r--r--core/src/main/scala/spark/deploy/client/ClientListener.scala2
-rw-r--r--core/src/main/scala/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala6
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala7
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala12
8 files changed, 22 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index f05413a53b..457122745b 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -27,7 +27,8 @@ case class ExecutorStateChanged(
jobId: String,
execId: Int,
state: ExecutorState,
- message: Option[String])
+ message: Option[String],
+ exitStatus: Option[Int])
extends DeployMessage
// Master to Worker
@@ -58,7 +59,8 @@ private[spark]
case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
private[spark]
-case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String])
+case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
+ exitStatus: Option[Int])
private[spark]
case class JobKilled(message: String)
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index c57a1d33e9..90fe9508cd 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -66,12 +66,12 @@ private[spark] class Client(
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
listener.executorAdded(fullId, workerId, host, cores, memory)
- case ExecutorUpdated(id, state, message) =>
+ case ExecutorUpdated(id, state, message, exitStatus) =>
val fullId = jobId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
- listener.executorRemoved(fullId, message.getOrElse(""))
+ listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
}
case Terminated(actor_) if actor_ == master =>
diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala
index a8fa982085..da6abcc9c2 100644
--- a/core/src/main/scala/spark/deploy/client/ClientListener.scala
+++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala
@@ -14,5 +14,5 @@ private[spark] trait ClientListener {
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit
- def executorRemoved(id: String, message: String): Unit
+ def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit
}
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
index 5b710f5520..57a7e123b7 100644
--- a/core/src/main/scala/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/spark/deploy/client/TestClient.scala
@@ -18,7 +18,7 @@ private[spark] object TestClient {
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {}
- def executorRemoved(id: String, message: String) {}
+ def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
}
def main(args: Array[String]) {
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 31fb83f2e2..b30c8e99b5 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -83,12 +83,12 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
schedule()
}
- case ExecutorStateChanged(jobId, execId, state, message) => {
+ case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => {
val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId))
execOption match {
case Some(exec) => {
exec.state = state
- exec.job.actor ! ExecutorUpdated(execId, state, message)
+ exec.job.actor ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job
@@ -218,7 +218,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
- exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None)
+ exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
exec.job.executors -= exec.id
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 07ae7bca78..beceb55ecd 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -60,7 +60,7 @@ private[spark] class ExecutorRunner(
process.destroy()
process.waitFor()
}
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None)
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
@@ -134,7 +134,8 @@ private[spark] class ExecutorRunner(
// times on the same machine.
val exitCode = process.waitFor()
val message = "Command exited with code " + exitCode
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message),
+ Some(exitCode))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
@@ -145,7 +146,7 @@ private[spark] class ExecutorRunner(
process.destroy()
}
val message = e.getClass + ": " + e.getMessage
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), None)
}
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 31b8f0f955..7c9e588ea2 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -127,10 +127,10 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
- master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None)
+ master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None, None)
- case ExecutorStateChanged(jobId, execId, state, message) =>
- master ! ExecutorStateChanged(jobId, execId, state, message)
+ case ExecutorStateChanged(jobId, execId, state, message, exitStatus) =>
+ master ! ExecutorStateChanged(jobId, execId, state, message, exitStatus)
val fullId = jobId + "/" + execId
if (ExecutorState.isFinished(state)) {
val executor = executors(fullId)
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index efaf2d330c..e2301347e5 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -71,14 +71,10 @@ private[spark] class SparkDeploySchedulerBackend(
id, host, cores, Utils.memoryMegabytesToString(memory)))
}
- def executorRemoved(id: String, message: String) {
- var reason: ExecutorLossReason = SlaveLost(message)
- if (message.startsWith("Command exited with code ")) {
- try {
- reason = ExecutorExited(message.substring("Command exited with code ".length).toInt)
- } catch {
- case nfe: NumberFormatException => {}
- }
+ def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {
+ val reason: ExecutorLossReason = exitStatus match {
+ case Some(code) => ExecutorExited(code)
+ case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(id, message))
executorIdToSlaveId.get(id) match {