diff options
author | Pete Robbins <robbinspg@gmail.com> | 2016-06-02 10:14:51 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-06-02 10:14:51 -0700 |
commit | 7c07d176f3d65235f9376898a7b10b01268c867c (patch) | |
tree | 658d51ecace6c76f5e5dd4c5e829d69d9314ff2a /core | |
parent | 63b7f127caf2fdf96eeb8457afd6c96bc8309a58 (diff) | |
download | spark-7c07d176f3d65235f9376898a7b10b01268c867c.tar.gz spark-7c07d176f3d65235f9376898a7b10b01268c867c.tar.bz2 spark-7c07d176f3d65235f9376898a7b10b01268c867c.zip |
[SPARK-15606][CORE] Use non-blocking removeExecutor call to avoid deadlocks
## What changes were proposed in this pull request?
Set minimum number of dispatcher threads to 3 to avoid deadlocks on machines with only 2 cores
## How was this patch tested?
Spark test builds
Author: Pete Robbins <robbinspg@gmail.com>
Closes #13355 from robbinspg/SPARK-13906.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala | 8 |
2 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0fea9c123b..e84cb6346d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -295,7 +295,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // manager to reregister itself. If that happens, the block manager master will know // about the executor, but the scheduler will not. Therefore, we should remove the // executor from the block manager when we hit this case. - scheduler.sc.env.blockManager.master.removeExecutor(executorId) + scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId) logInfo(s"Asked to remove non-existent executor $executorId") } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 52db45bd48..8655cf10fc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -42,6 +42,14 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } + /** Request removal of a dead executor from the driver endpoint. + * This is only called on the driver side. Non-blocking + */ + def removeExecutorAsync(execId: String) { + driverEndpoint.ask[Boolean](RemoveExecutor(execId)) + logInfo("Removal of executor " + execId + " requested") + } + /** Register the BlockManager's id with the driver. */ def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = { |