aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala8
2 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0fea9c123b..e84cb6346d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -295,7 +295,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// manager to reregister itself. If that happens, the block manager master will know
// about the executor, but the scheduler will not. Therefore, we should remove the
// executor from the block manager when we hit this case.
- scheduler.sc.env.blockManager.master.removeExecutor(executorId)
+ scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
logInfo(s"Asked to remove non-existent executor $executorId")
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 52db45bd48..8655cf10fc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -42,6 +42,14 @@ class BlockManagerMaster(
logInfo("Removed " + execId + " successfully in removeExecutor")
}
+ /** Request removal of a dead executor from the driver endpoint.
+ * This is only called on the driver side. Non-blocking
+ */
+ def removeExecutorAsync(execId: String) {
+ driverEndpoint.ask[Boolean](RemoveExecutor(execId))
+ logInfo("Removal of executor " + execId + " requested")
+ }
+
/** Register the BlockManager's id with the driver. */
def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {