aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-08-31 10:56:02 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-31 10:56:02 -0700
commit9bcb33c54117cebc9e087017bf4e4163edaeff17 (patch)
tree5dc1c79fc73c00df611b217bd0c2fdd11000468f /core
parent0611b3a2bf6d73ab62ee133fbb70430839bea7bc (diff)
downloadspark-9bcb33c54117cebc9e087017bf4e4163edaeff17.tar.gz
spark-9bcb33c54117cebc9e087017bf4e4163edaeff17.tar.bz2
spark-9bcb33c54117cebc9e087017bf4e4163edaeff17.zip
[SPARK-17316][CORE] Make CoarseGrainedSchedulerBackend.removeExecutor non-blocking
## What changes were proposed in this pull request? StandaloneSchedulerBackend.executorRemoved is a blocking call right now. It may cause some deadlock since it's called inside StandaloneAppClient.ClientEndpoint. This PR just changed CoarseGrainedSchedulerBackend.removeExecutor to be non-blocking. It's safe since the only two usages (StandaloneSchedulerBackend and YarnSchedulerEndpoint) don't need the return value). ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #14882 from zsxwing/SPARK-17316.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala17
1 files changed, 9 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 8259923ce3..2db3a3bb81 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -406,14 +406,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
- // Called by subclasses when notified of a lost worker
- def removeExecutor(executorId: String, reason: ExecutorLossReason) {
- try {
- driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
- } catch {
- case e: Exception =>
- throw new SparkException("Error notifying standalone scheduler's driver endpoint", e)
- }
+ /**
+ * Called by subclasses when notified of a lost worker. It just fires the message and returns
+ * at once.
+ */
+ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+ // Only log the failure since we don't care about the result.
+ driverEndpoint.ask(RemoveExecutor(executorId, reason)).onFailure { case t =>
+ logError(t.getMessage, t)
+ }(ThreadUtils.sameThread)
}
def sufficientResourcesRegistered(): Boolean = true