diff options
author | WangTaoTheTonic <wangtao111@huawei.com> | 2016-08-11 15:09:23 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-08-11 15:09:23 -0700 |
commit | ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e (patch) | |
tree | 2148e3069715302c7c0cf9003f9581a40350822b /yarn/src/main | |
parent | 4ec5c360ce2045a9bdecb3c5277ba519bf0f44ae (diff) | |
download | spark-ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e.tar.gz spark-ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e.tar.bz2 spark-ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e.zip |
[SPARK-17022][YARN] Handle potential deadlock in driver handling messages
## What changes were proposed in this pull request?
We directly send RequestExecutors to AM instead of transfer it to yarnShedulerBackend first, to avoid potential deadlock.
## How was this patch tested?
manual tests
Author: WangTaoTheTonic <wangtao111@huawei.com>
Closes #14605 from WangTaoTheTonic/lock.
Diffstat (limited to 'yarn/src/main')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 6b3c831e60..ea63ff5dc1 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -125,8 +125,20 @@ private[spark] abstract class YarnSchedulerBackend( * This includes executors already pending or running. */ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { - yarnSchedulerEndpointRef.askWithRetry[Boolean]( - RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)) + val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) + yarnSchedulerEndpoint.amEndpoint match { + case Some(am) => + try { + am.askWithRetry[Boolean](r) + } catch { + case NonFatal(e) => + logError(s"Sending $r to AM was unsuccessful", e) + return false + } + case None => + logWarning("Attempted to request executors before the AM has registered!") + return false + } } /** @@ -209,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend( */ private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { - private var amEndpoint: Option[RpcEndpointRef] = None + var amEndpoint: Option[RpcEndpointRef] = None private val askAmThreadPool = ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool") |