aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorAngus Gerry <angolon@gmail.com>2016-09-01 10:35:31 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-09-01 10:35:31 -0700
commita0aac4b775bc8c275f96ad0fbf85c9d8a3690588 (patch)
tree94e7f9f8d5eb9f1b32009d99caea48376cc84c81 /yarn
parentadaaffa34ef0ef6a7baa5c1fea848cf5bc3987a2 (diff)
downloadspark-a0aac4b775bc8c275f96ad0fbf85c9d8a3690588.tar.gz
spark-a0aac4b775bc8c275f96ad0fbf85c9d8a3690588.tar.bz2
spark-a0aac4b775bc8c275f96ad0fbf85c9d8a3690588.zip
[SPARK-16533][CORE] resolve deadlocking in driver when executors die
## What changes were proposed in this pull request? This pull request reverts the changes made as a part of #14605, which simply side-steps the deadlock issue. Instead, I propose the following approach: * Use `scheduleWithFixedDelay` when calling `ExecutorAllocationManager.schedule` for scheduling executor requests. The intent of this is that if invocations are delayed beyond the default schedule interval on account of lock contention, then we avoid a situation where calls to `schedule` are made back-to-back, potentially releasing and then immediately reacquiring these locks - further exacerbating contention. * Replace a number of calls to `askWithRetry` with `ask` inside of message handling code in `CoarseGrainedSchedulerBackend` and its ilk. This allows us queue messages with the relevant endpoints, release whatever locks we might be holding, and then block whilst awaiting the response. This change is made at the cost of being able to retry should sending the message fail, as retrying outside of the lock could easily cause race conditions if other conflicting messages have been sent whilst awaiting a response. I believe this to be the lesser of two evils, as in many cases these RPC calls are to process local components, and so failures are more likely to be deterministic, and timeouts are more likely to be caused by lock contention. ## How was this patch tested? Existing tests, and manual tests under yarn-client mode. Author: Angus Gerry <angolon@gmail.com> Closes #14710 from angolon/SPARK-16533.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala95
1 files changed, 41 insertions, 54 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index ea63ff5dc1..2f9ea1911f 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster
import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
@@ -124,28 +125,16 @@ private[spark] abstract class YarnSchedulerBackend(
* Request executors from the ApplicationMaster by specifying the total number desired.
* This includes executors already pending or running.
*/
- override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)
- yarnSchedulerEndpoint.amEndpoint match {
- case Some(am) =>
- try {
- am.askWithRetry[Boolean](r)
- } catch {
- case NonFatal(e) =>
- logError(s"Sending $r to AM was unsuccessful", e)
- return false
- }
- case None =>
- logWarning("Attempted to request executors before the AM has registered!")
- return false
- }
+ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+ yarnSchedulerEndpointRef.ask[Boolean](
+ RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
}
/**
* Request that the ApplicationMaster kill the specified executors.
*/
- override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
+ override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+ yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
}
override def sufficientResourcesRegistered(): Boolean = {
@@ -221,37 +210,37 @@ private[spark] abstract class YarnSchedulerBackend(
*/
private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {
- var amEndpoint: Option[RpcEndpointRef] = None
-
- private val askAmThreadPool =
- ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
- implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
+ private var amEndpoint: Option[RpcEndpointRef] = None
private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
executorId: String,
executorRpcAddress: RpcAddress): Unit = {
- amEndpoint match {
+ val removeExecutorMessage = amEndpoint match {
case Some(am) =>
val lossReasonRequest = GetExecutorLossReason(executorId)
- val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
- future onSuccess {
- case reason: ExecutorLossReason =>
- driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
- }
- future onFailure {
- case NonFatal(e) =>
- logWarning(s"Attempted to get executor loss reason" +
- s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
- s" but got no response. Marking as slave lost.", e)
- driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
- case t => throw t
- }
+ am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+ .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread)
+ .recover {
+ case NonFatal(e) =>
+ logWarning(s"Attempted to get executor loss reason" +
+ s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
+ s" but got no response. Marking as slave lost.", e)
+ RemoveExecutor(executorId, SlaveLost())
+ }(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to check for an executor loss reason" +
" before the AM has registered!")
- driverEndpoint.askWithRetry[Boolean](
- RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
+ Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
}
+
+ removeExecutorMessage
+ .flatMap { message =>
+ driverEndpoint.ask[Boolean](message)
+ }(ThreadUtils.sameThread)
+ .onFailure {
+ case NonFatal(e) => logError(
+ s"Error requesting driver to remove executor $executorId after disconnection.", e)
+ }(ThreadUtils.sameThread)
}
override def receive: PartialFunction[Any, Unit] = {
@@ -269,9 +258,13 @@ private[spark] abstract class YarnSchedulerBackend(
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
- case RemoveExecutor(executorId, reason) =>
+ case r @ RemoveExecutor(executorId, reason) =>
logWarning(reason.toString)
- removeExecutor(executorId, reason)
+ driverEndpoint.ask[Boolean](r).onFailure {
+ case e =>
+ logError("Error requesting driver to remove executor" +
+ s" $executorId for reason $reason", e)
+ }(ThreadUtils.sameThread)
}
@@ -279,13 +272,12 @@ private[spark] abstract class YarnSchedulerBackend(
case r: RequestExecutors =>
amEndpoint match {
case Some(am) =>
- Future {
- context.reply(am.askWithRetry[Boolean](r))
- } onFailure {
- case NonFatal(e) =>
+ am.ask[Boolean](r).andThen {
+ case Success(b) => context.reply(b)
+ case Failure(NonFatal(e)) =>
logError(s"Sending $r to AM was unsuccessful", e)
context.sendFailure(e)
- }
+ }(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to request executors before the AM has registered!")
context.reply(false)
@@ -294,13 +286,12 @@ private[spark] abstract class YarnSchedulerBackend(
case k: KillExecutors =>
amEndpoint match {
case Some(am) =>
- Future {
- context.reply(am.askWithRetry[Boolean](k))
- } onFailure {
- case NonFatal(e) =>
+ am.ask[Boolean](k).andThen {
+ case Success(b) => context.reply(b)
+ case Failure(NonFatal(e)) =>
logError(s"Sending $k to AM was unsuccessful", e)
context.sendFailure(e)
- }
+ }(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
@@ -316,10 +307,6 @@ private[spark] abstract class YarnSchedulerBackend(
amEndpoint = None
}
}
-
- override def onStop(): Unit = {
- askAmThreadPool.shutdownNow()
- }
}
}