aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala')
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala18
1 files changed, 11 insertions, 7 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index a8781636f2..6b3c831e60 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -39,9 +39,12 @@ private[spark] abstract class YarnSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
+ override val minRegisteredRatio =
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+ 0.8
+ } else {
+ super.minRegisteredRatio
+ }
protected var totalExpectedExecutors = 0
@@ -220,17 +223,15 @@ private[spark] abstract class YarnSchedulerBackend(
val lossReasonRequest = GetExecutorLossReason(executorId)
val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
future onSuccess {
- case reason: ExecutorLossReason => {
+ case reason: ExecutorLossReason =>
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
- }
}
future onFailure {
- case NonFatal(e) => {
+ case NonFatal(e) =>
logWarning(s"Attempted to get executor loss reason" +
s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
s" but got no response. Marking as slave lost.", e)
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
- }
case t => throw t
}
case None =>
@@ -292,6 +293,9 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
}
+
+ case RetrieveLastAllocatedExecutorId =>
+ context.reply(currentExecutorIdCounter)
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {