aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala95
1 files changed, 41 insertions, 54 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index ea63ff5dc1..2f9ea1911f 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster
import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
@@ -124,28 +125,16 @@ private[spark] abstract class YarnSchedulerBackend(
* Request executors from the ApplicationMaster by specifying the total number desired.
* This includes executors already pending or running.
*/
- override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)
- yarnSchedulerEndpoint.amEndpoint match {
- case Some(am) =>
- try {
- am.askWithRetry[Boolean](r)
- } catch {
- case NonFatal(e) =>
- logError(s"Sending $r to AM was unsuccessful", e)
- return false
- }
- case None =>
- logWarning("Attempted to request executors before the AM has registered!")
- return false
- }
+ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+ yarnSchedulerEndpointRef.ask[Boolean](
+ RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
}
/**
* Request that the ApplicationMaster kill the specified executors.
*/
- override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
+ override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+ yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
}
override def sufficientResourcesRegistered(): Boolean = {
@@ -221,37 +210,37 @@ private[spark] abstract class YarnSchedulerBackend(
*/
private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {
- var amEndpoint: Option[RpcEndpointRef] = None
-
- private val askAmThreadPool =
- ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
- implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
+ private var amEndpoint: Option[RpcEndpointRef] = None
private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
executorId: String,
executorRpcAddress: RpcAddress): Unit = {
- amEndpoint match {
+ val removeExecutorMessage = amEndpoint match {
case Some(am) =>
val lossReasonRequest = GetExecutorLossReason(executorId)
- val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
- future onSuccess {
- case reason: ExecutorLossReason =>
- driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
- }
- future onFailure {
- case NonFatal(e) =>
- logWarning(s"Attempted to get executor loss reason" +
- s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
- s" but got no response. Marking as slave lost.", e)
- driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
- case t => throw t
- }
+ am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+ .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread)
+ .recover {
+ case NonFatal(e) =>
+ logWarning(s"Attempted to get executor loss reason" +
+ s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
+ s" but got no response. Marking as slave lost.", e)
+ RemoveExecutor(executorId, SlaveLost())
+ }(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to check for an executor loss reason" +
" before the AM has registered!")
- driverEndpoint.askWithRetry[Boolean](
- RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
+ Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
}
+
+ removeExecutorMessage
+ .flatMap { message =>
+ driverEndpoint.ask[Boolean](message)
+ }(ThreadUtils.sameThread)
+ .onFailure {
+ case NonFatal(e) => logError(
+ s"Error requesting driver to remove executor $executorId after disconnection.", e)
+ }(ThreadUtils.sameThread)
}
override def receive: PartialFunction[Any, Unit] = {
@@ -269,9 +258,13 @@ private[spark] abstract class YarnSchedulerBackend(
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
- case RemoveExecutor(executorId, reason) =>
+ case r @ RemoveExecutor(executorId, reason) =>
logWarning(reason.toString)
- removeExecutor(executorId, reason)
+ driverEndpoint.ask[Boolean](r).onFailure {
+ case e =>
+ logError("Error requesting driver to remove executor" +
+ s" $executorId for reason $reason", e)
+ }(ThreadUtils.sameThread)
}
@@ -279,13 +272,12 @@ private[spark] abstract class YarnSchedulerBackend(
case r: RequestExecutors =>
amEndpoint match {
case Some(am) =>
- Future {
- context.reply(am.askWithRetry[Boolean](r))
- } onFailure {
- case NonFatal(e) =>
+ am.ask[Boolean](r).andThen {
+ case Success(b) => context.reply(b)
+ case Failure(NonFatal(e)) =>
logError(s"Sending $r to AM was unsuccessful", e)
context.sendFailure(e)
- }
+ }(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to request executors before the AM has registered!")
context.reply(false)
@@ -294,13 +286,12 @@ private[spark] abstract class YarnSchedulerBackend(
case k: KillExecutors =>
amEndpoint match {
case Some(am) =>
- Future {
- context.reply(am.askWithRetry[Boolean](k))
- } onFailure {
- case NonFatal(e) =>
+ am.ask[Boolean](k).andThen {
+ case Success(b) => context.reply(b)
+ case Failure(NonFatal(e)) =>
logError(s"Sending $k to AM was unsuccessful", e)
context.sendFailure(e)
- }
+ }(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
@@ -316,10 +307,6 @@ private[spark] abstract class YarnSchedulerBackend(
amEndpoint = None
}
}
-
- override def onStop(): Unit = {
- askAmThreadPool.shutdownNow()
- }
}
}