aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/master/Master.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala24
1 files changed, 24 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index ff8d29fdb4..6b9b1408ee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -368,6 +368,30 @@ private[deploy] class Master(
if (canCompleteRecovery) { completeRecovery() }
}
+ case WorkerLatestState(workerId, executors, driverIds) =>
+ idToWorker.get(workerId) match {
+ case Some(worker) =>
+ for (exec <- executors) {
+ val executorMatches = worker.executors.exists {
+ case (_, e) => e.application.id == exec.appId && e.id == exec.execId
+ }
+ if (!executorMatches) {
+ // master doesn't recognize this executor. So just tell worker to kill it.
+ worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
+ }
+ }
+
+ for (driverId <- driverIds) {
+ val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
+ if (!driverMatches) {
+ // master doesn't recognize this driver. So just tell worker to kill it.
+ worker.endpoint.send(KillDriver(driverId))
+ }
+ }
+ case None =>
+ logWarning("Worker state from unknown worker: " + workerId)
+ }
+
case UnregisterApplication(applicationId) =>
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)