diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c6b3fdf439..edc3c19937 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -528,7 +528,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the kill request is acknowledged. If list to kill is empty, it will return * false. */ - final override def killExecutors(executorIds: Seq[String]): Boolean = { + final override def killExecutors(executorIds: Seq[String]): Seq[String] = { killExecutors(executorIds, replace = false, force = false) } @@ -548,7 +548,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp final def killExecutors( executorIds: Seq[String], replace: Boolean, - force: Boolean): Boolean = { + force: Boolean): Seq[String] = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") val response = synchronized { @@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp .filter { id => force || !scheduler.isExecutorBusy(id) } executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}") + // If we do not wish to replace the executors we kill, sync the target number of executors // with the cluster manager to avoid allocating new ones. When computing the new target, // take into account executors that are pending to be added or removed. @@ -583,7 +585,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp _ => Future.successful(false) } - adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + + killResponse.flatMap(killSuccessful => + Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String]) + )(ThreadUtils.sameThread) } defaultAskTimeout.awaitResult(response) |