aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-08-12 09:24:50 -0700
committerAndrew Or <andrew@databricks.com>2015-08-12 09:24:50 -0700
commitbe5d1912076c2ffd21ec88611e53d3b3c59b7ecc (patch)
treec1e540052ff82c04c275cbd5ea4227162ee17671 /core
parent2e680668f7b6fc158aa068aedd19c1878ecf759e (diff)
downloadspark-be5d1912076c2ffd21ec88611e53d3b3c59b7ecc.tar.gz
spark-be5d1912076c2ffd21ec88611e53d3b3c59b7ecc.tar.bz2
spark-be5d1912076c2ffd21ec88611e53d3b3c59b7ecc.zip
[SPARK-9795] Dynamic allocation: avoid double counting when killing same executor twice
This is based on KaiXinXiaoLei's changes in #7716. The issue is that when someone calls `sc.killExecutor("1")` on the same executor twice quickly, then the executor target will be adjusted downwards by 2 instead of 1 even though we're only actually killing one executor. In certain cases where we don't adjust the target back upwards quickly, we'll end up with jobs hanging. This is a common danger because there are many places where this is called: - `HeartbeatReceiver` kills an executor that has not been sending heartbeats - `ExecutorAllocationManager` kills an executor that has been idle - The user code might call this, which may interfere with the previous callers While it's not clear whether this fixes SPARK-9745, fixing this potential race condition seems like a strict improvement. I've added a regression test to illustrate the issue. Author: Andrew Or <andrew@databricks.com> Closes #8078 from andrewor14/da-double-kill.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala20
2 files changed, 27 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6acf8a9a5e..5730a87f96 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -422,16 +422,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logWarning(s"Executor to kill $id does not exist!")
}
+ // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
+ val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) }
+ executorsPendingToRemove ++= executorsToKill
+
// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
if (!replace) {
- doRequestTotalExecutors(numExistingExecutors + numPendingExecutors
- - executorsPendingToRemove.size - knownExecutors.size)
+ doRequestTotalExecutors(
+ numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
}
- executorsPendingToRemove ++= knownExecutors
- doKillExecutors(knownExecutors)
+ doKillExecutors(executorsToKill)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 08c41a897a..1f2a0f0d30 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -283,6 +283,26 @@ class StandaloneDynamicAllocationSuite
assert(master.apps.head.getExecutorLimit === 1000)
}
+ test("kill the same executor twice (SPARK-9795)") {
+ sc = new SparkContext(appConf)
+ val appId = sc.applicationId
+ assert(master.apps.size === 1)
+ assert(master.apps.head.id === appId)
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ // sync executors between the Master and the driver, needed because
+ // the driver refuses to kill executors it does not know about
+ syncExecutors(sc)
+ // kill the same executor twice
+ val executors = getExecutorIds(sc)
+ assert(executors.size === 2)
+ assert(sc.killExecutor(executors.head))
+ assert(sc.killExecutor(executors.head))
+ assert(master.apps.head.executors.size === 1)
+ // The limit should not be lowered twice
+ assert(master.apps.head.getExecutorLimit === 1)
+ }
+
// ===============================
// | Utility methods for testing |
// ===============================