aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala35
2 files changed, 37 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 18771f79b4..55a564b5c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -438,6 +438,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+ } else {
+ numPendingExecutors += knownExecutors.size
}
doKillExecutors(executorsToKill)
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 2e2fa22eb4..d145e78834 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -369,6 +369,41 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === 1)
}
+ test("the pending replacement executors should not be lost (SPARK-10515)") {
+ sc = new SparkContext(appConf)
+ val appId = sc.applicationId
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
+ // sync executors between the Master and the driver, needed because
+ // the driver refuses to kill executors it does not know about
+ syncExecutors(sc)
+ val executors = getExecutorIds(sc)
+ assert(executors.size === 2)
+ // kill executor 1, and replace it
+ assert(sc.killAndReplaceExecutor(executors.head))
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ }
+
+ var apps = getApplications()
+ // kill executor 1
+ assert(sc.killExecutor(executors.head))
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === 2)
+ // kill executor 2
+ assert(sc.killExecutor(executors(1)))
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.getExecutorLimit === 1)
+ }
+
// ===============================
// | Utility methods for testing |
// ===============================