diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/master/Master.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index ff8d29fdb4..6b9b1408ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -368,6 +368,30 @@ private[deploy] class Master( if (canCompleteRecovery) { completeRecovery() } } + case WorkerLatestState(workerId, executors, driverIds) => + idToWorker.get(workerId) match { + case Some(worker) => + for (exec <- executors) { + val executorMatches = worker.executors.exists { + case (_, e) => e.application.id == exec.appId && e.id == exec.execId + } + if (!executorMatches) { + // master doesn't recognize this executor. So just tell worker to kill it. + worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId)) + } + } + + for (driverId <- driverIds) { + val driverMatches = worker.drivers.exists { case (id, _) => id == driverId } + if (!driverMatches) { + // master doesn't recognize this driver. So just tell worker to kill it. + worker.endpoint.send(KillDriver(driverId)) + } + } + case None => + logWarning("Worker state from unknown worker: " + workerId) + } + case UnregisterApplication(applicationId) => logInfo(s"Received unregister request from application $applicationId") idToApp.get(applicationId).foreach(finishApplication) |