diff options
Diffstat (limited to 'yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index a8781636f2..6b3c831e60 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -39,9 +39,12 @@ private[spark] abstract class YarnSchedulerBackend( sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { - if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { - minRegisteredRatio = 0.8 - } + override val minRegisteredRatio = + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 + } else { + super.minRegisteredRatio + } protected var totalExpectedExecutors = 0 @@ -220,17 +223,15 @@ private[spark] abstract class YarnSchedulerBackend( val lossReasonRequest = GetExecutorLossReason(executorId) val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout) future onSuccess { - case reason: ExecutorLossReason => { + case reason: ExecutorLossReason => driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason)) - } } future onFailure { - case NonFatal(e) => { + case NonFatal(e) => logWarning(s"Attempted to get executor loss reason" + s" for executor id ${executorId} at RPC address ${executorRpcAddress}," + s" but got no response. Marking as slave lost.", e) driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost())) - } case t => throw t } case None => @@ -292,6 +293,9 @@ private[spark] abstract class YarnSchedulerBackend( logWarning("Attempted to kill executors before the AM has registered!") context.reply(false) } + + case RetrieveLastAllocatedExecutorId => + context.reply(currentExecutorIdCounter) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { |