aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala12
1 files changed, 9 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c6b3fdf439..edc3c19937 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -528,7 +528,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
- final override def killExecutors(executorIds: Seq[String]): Boolean = {
+ final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
killExecutors(executorIds, replace = false, force = false)
}
@@ -548,7 +548,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
- force: Boolean): Boolean = {
+ force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val response = synchronized {
@@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
+ logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")
+
// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
@@ -583,7 +585,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
_ => Future.successful(false)
}
- adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+ val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+
+ killResponse.flatMap(killSuccessful =>
+ Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String])
+ )(ThreadUtils.sameThread)
}
defaultAskTimeout.awaitResult(response)