aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2013-03-05 18:37:25 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2013-03-09 11:29:45 -0800
commitb0983c5762b583c186a3b64606fa2625af962940 (patch)
tree5121a800b62f9ee395bba68ad701b451b6a43846 /core
parent9f0dc829cbaa9aa5011fb917010d13ea5e0a19d7 (diff)
downloadspark-b0983c5762b583c186a3b64606fa2625af962940.tar.gz
spark-b0983c5762b583c186a3b64606fa2625af962940.tar.bz2
spark-b0983c5762b583c186a3b64606fa2625af962940.zip
Notify standalone deploy client of application death.
Usually, this isn't necessary since the application will be removed as a result of the deploy client disconnecting, but occassionally, the standalone deploy master removes an application otherwise. Also mark applications as FAILED instead of FINISHED when they are killed as a result of their executors failing too many times.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala2
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala5
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala17
3 files changed, 17 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 3cbf4fdd98..8a3e64e4c2 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -65,7 +65,7 @@ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String
exitStatus: Option[Int])
private[spark]
-case class appKilled(message: String)
+case class ApplicationRemoved(message: String)
// Internal message in Client
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index 1a95524cf9..2fc5e657f9 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -54,6 +54,11 @@ private[spark] class Client(
appId = appId_
listener.connected(appId)
+ case ApplicationRemoved(message) =>
+ logError("Master removed our application: %s; stopping client".format(message))
+ markDisconnected()
+ context.stop(self)
+
case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) =>
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 4af22cf9b6..71b9d0801d 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -107,7 +107,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
} else {
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
- removeApplication(appInfo)
+ removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
@@ -129,19 +129,19 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker)
- actorToApp.get(actor).foreach(removeApplication)
+ actorToApp.get(actor).foreach(finishApplication)
}
case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(removeApplication)
+ addressToApp.get(address).foreach(finishApplication)
}
case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(removeApplication)
+ addressToApp.get(address).foreach(finishApplication)
}
case RequestMasterState => {
@@ -257,7 +257,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
return app
}
- def removeApplication(app: ApplicationInfo) {
+ def finishApplication(app: ApplicationInfo) {
+ removeApplication(app, ApplicationState.FINISHED)
+ }
+
+ def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
apps -= app
@@ -270,7 +274,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
}
- app.markFinished(ApplicationState.FINISHED) // TODO: Mark it as FAILED if it failed
+ app.markFinished(state)
+ app.driver ! ApplicationRemoved(state.toString)
schedule()
}
}