diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-08-31 10:56:02 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-08-31 10:56:02 -0700 |
commit | 9bcb33c54117cebc9e087017bf4e4163edaeff17 (patch) | |
tree | 5dc1c79fc73c00df611b217bd0c2fdd11000468f | |
parent | 0611b3a2bf6d73ab62ee133fbb70430839bea7bc (diff) | |
download | spark-9bcb33c54117cebc9e087017bf4e4163edaeff17.tar.gz spark-9bcb33c54117cebc9e087017bf4e4163edaeff17.tar.bz2 spark-9bcb33c54117cebc9e087017bf4e4163edaeff17.zip |
[SPARK-17316][CORE] Make CoarseGrainedSchedulerBackend.removeExecutor non-blocking
## What changes were proposed in this pull request?
StandaloneSchedulerBackend.executorRemoved is a blocking call right now. It may cause some deadlock since it's called inside StandaloneAppClient.ClientEndpoint.
This PR just changed CoarseGrainedSchedulerBackend.removeExecutor to be non-blocking. It's safe since the only two usages (StandaloneSchedulerBackend and YarnSchedulerEndpoint) don't need the return value).
## How was this patch tested?
Jenkins unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #14882 from zsxwing/SPARK-17316.
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8259923ce3..2db3a3bb81 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -406,14 +406,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } - // Called by subclasses when notified of a lost worker - def removeExecutor(executorId: String, reason: ExecutorLossReason) { - try { - driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason)) - } catch { - case e: Exception => - throw new SparkException("Error notifying standalone scheduler's driver endpoint", e) - } + /** + * Called by subclasses when notified of a lost worker. It just fires the message and returns + * at once. + */ + protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { + // Only log the failure since we don't care about the result. + driverEndpoint.ask(RemoveExecutor(executorId, reason)).onFailure { case t => + logError(t.getMessage, t) + }(ThreadUtils.sameThread) } def sufficientResourcesRegistered(): Boolean = true |