aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala10
4 files changed, 16 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 0997507d01..9db6fd1ac4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -101,6 +101,8 @@ private[deploy] object DeployMessages {
case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage
+ case class UnregisterApplication(appId: String)
+
case class MasterChangeAcknowledged(appId: String)
// Master to AppClient
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 3b72972525..4f06d7f96c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -157,6 +157,7 @@ private[spark] class AppClient(
case StopAppClient =>
markDead("Application has been stopped.")
+ master ! UnregisterApplication(appId)
sender ! true
context.stop(self)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index f979ffa166..bc5b293379 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -111,6 +111,10 @@ private[deploy] class ApplicationInfo(
endTime = System.currentTimeMillis()
}
+ private[master] def isFinished: Boolean = {
+ state != ApplicationState.WAITING && state != ApplicationState.RUNNING
+ }
+
def duration: Long = {
if (endTime != -1) {
endTime - startTime
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 80506621f4..9a5d5877da 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -339,7 +339,11 @@ private[master] class Master(
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
- appInfo.removeExecutor(exec)
+ // If an application has already finished, preserve its
+ // state to display its information properly on the UI
+ if (!appInfo.isFinished) {
+ appInfo.removeExecutor(exec)
+ }
exec.worker.removeExecutor(exec)
val normalExit = exitStatus == Some(0)
@@ -428,6 +432,10 @@ private[master] class Master(
if (canCompleteRecovery) { completeRecovery() }
}
+ case UnregisterApplication(applicationId) =>
+ logInfo(s"Received unregister request from application $applicationId")
+ idToApp.get(applicationId).foreach(finishApplication)
+
case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")