aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorWangTaoTheTonic <wangtao111@huawei.com>2016-08-11 15:09:23 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-11 15:09:23 -0700
commitea0bf91b4a2ca3ef472906e50e31fd6268b6f53e (patch)
tree2148e3069715302c7c0cf9003f9581a40350822b /yarn
parent4ec5c360ce2045a9bdecb3c5277ba519bf0f44ae (diff)
downloadspark-ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e.tar.gz
spark-ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e.tar.bz2
spark-ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e.zip
[SPARK-17022][YARN] Handle potential deadlock in driver handling messages
## What changes were proposed in this pull request? We directly send RequestExecutors to AM instead of transfer it to yarnShedulerBackend first, to avoid potential deadlock. ## How was this patch tested? manual tests Author: WangTaoTheTonic <wangtao111@huawei.com> Closes #14605 from WangTaoTheTonic/lock.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala18
1 files changed, 15 insertions, 3 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 6b3c831e60..ea63ff5dc1 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -125,8 +125,20 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- yarnSchedulerEndpointRef.askWithRetry[Boolean](
- RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
+ val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)
+ yarnSchedulerEndpoint.amEndpoint match {
+ case Some(am) =>
+ try {
+ am.askWithRetry[Boolean](r)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Sending $r to AM was unsuccessful", e)
+ return false
+ }
+ case None =>
+ logWarning("Attempted to request executors before the AM has registered!")
+ return false
+ }
}
/**
@@ -209,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend(
*/
private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {
- private var amEndpoint: Option[RpcEndpointRef] = None
+ var amEndpoint: Option[RpcEndpointRef] = None
private val askAmThreadPool =
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")