aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala20
2 files changed, 27 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6acf8a9a5e..5730a87f96 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -422,16 +422,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logWarning(s"Executor to kill $id does not exist!")
}
+ // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
+ val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) }
+ executorsPendingToRemove ++= executorsToKill
+
// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
if (!replace) {
- doRequestTotalExecutors(numExistingExecutors + numPendingExecutors
- - executorsPendingToRemove.size - knownExecutors.size)
+ doRequestTotalExecutors(
+ numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
}
- executorsPendingToRemove ++= knownExecutors
- doKillExecutors(knownExecutors)
+ doKillExecutors(executorsToKill)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 08c41a897a..1f2a0f0d30 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -283,6 +283,26 @@ class StandaloneDynamicAllocationSuite
assert(master.apps.head.getExecutorLimit === 1000)
}
+ test("kill the same executor twice (SPARK-9795)") {
+ sc = new SparkContext(appConf)
+ val appId = sc.applicationId
+ assert(master.apps.size === 1)
+ assert(master.apps.head.id === appId)
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ // sync executors between the Master and the driver, needed because
+ // the driver refuses to kill executors it does not know about
+ syncExecutors(sc)
+ // kill the same executor twice
+ val executors = getExecutorIds(sc)
+ assert(executors.size === 2)
+ assert(sc.killExecutor(executors.head))
+ assert(sc.killExecutor(executors.head))
+ assert(master.apps.head.executors.size === 1)
+ // The limit should not be lowered twice
+ assert(master.apps.head.getExecutorLimit === 1)
+ }
+
// ===============================
// | Utility methods for testing |
// ===============================