aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKaiXinXiaoLei <huleilei1@huawei.com>2015-10-15 14:48:01 -0700
committerAndrew Or <andrew@databricks.com>2015-10-15 14:48:01 -0700
commit2d000124b72d0ff9e3ecefa03923405642516c4c (patch)
tree0fc0e761fb31e48284307e0bff4ba4b736cfc645 /core
parent723aa75a9d566c698aa49597f4f655396fef77bd (diff)
downloadspark-2d000124b72d0ff9e3ecefa03923405642516c4c.tar.gz
spark-2d000124b72d0ff9e3ecefa03923405642516c4c.tar.bz2
spark-2d000124b72d0ff9e3ecefa03923405642516c4c.zip
[SPARK-10515] When killing executor, the pending replacement executors should not be lost
If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them. For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1. see: https://github.com/apache/spark/pull/8668 Author: KaiXinXiaoLei <huleilei1@huawei.com> Author: huleilei <huleilei1@huawei.com> Closes #8945 from KaiXinXiaoLei/pendingexecutor.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala35
2 files changed, 37 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 18771f79b4..55a564b5c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -438,6 +438,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+ } else {
+ numPendingExecutors += knownExecutors.size
}
doKillExecutors(executorsToKill)
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 2e2fa22eb4..d145e78834 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -369,6 +369,41 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === 1)
}
+ test("the pending replacement executors should not be lost (SPARK-10515)") {
+ sc = new SparkContext(appConf)
+ val appId = sc.applicationId
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
+ // sync executors between the Master and the driver, needed because
+ // the driver refuses to kill executors it does not know about
+ syncExecutors(sc)
+ val executors = getExecutorIds(sc)
+ assert(executors.size === 2)
+ // kill executor 1, and replace it
+ assert(sc.killAndReplaceExecutor(executors.head))
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ }
+
+ var apps = getApplications()
+ // kill executor 1
+ assert(sc.killExecutor(executors.head))
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === 2)
+ // kill executor 2
+ assert(sc.killExecutor(executors(1)))
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.getExecutorLimit === 1)
+ }
+
// ===============================
// | Utility methods for testing |
// ===============================