aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorJimmy Xiang <jxiang@apache.org>2017-03-09 10:52:18 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2017-03-09 10:52:18 -0800
commitb60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9 (patch)
treeb9d7b7d2a0e7916d436c6f5f471972d118c0d443 /core/src
parent206030bd12405623c00c1ff334663984b9250adb (diff)
downloadspark-b60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9.tar.gz
spark-b60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9.tar.bz2
spark-b60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9.zip
[SPARK-19757][CORE] DriverEndpoint#makeOffers race against CoarseGrainedSchedulerBackend#killExecutors
## What changes were proposed in this pull request? While some executors are being killed due to idleness, if some new tasks come in, driver could assign them to some executors are being killed. These tasks will fail later when the executors are lost. This patch is to make sure CoarseGrainedSchedulerBackend#killExecutors and DriverEndpoint#makeOffers are properly synchronized. ## How was this patch tested? manual tests Author: Jimmy Xiang <jxiang@apache.org> Closes #17091 from jxiang/spark-19757.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala38
1 files changed, 26 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 94abe30bb1..7e2cfaccfc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -222,12 +222,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
- // Filter out executors under killing
- val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
- val workOffers = activeExecutors.map { case (id, executorData) =>
- new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
- }.toIndexedSeq
- launchTasks(scheduler.resourceOffers(workOffers))
+ // Make sure no executor is killed while some task is launching on it
+ val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+ // Filter out executors under killing
+ val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
+ val workOffers = activeExecutors.map { case (id, executorData) =>
+ new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
+ }.toIndexedSeq
+ scheduler.resourceOffers(workOffers)
+ }
+ if (!taskDescs.isEmpty) {
+ launchTasks(taskDescs)
+ }
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -240,12 +246,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
- // Filter out executors under killing
- if (executorIsAlive(executorId)) {
- val executorData = executorDataMap(executorId)
- val workOffers = IndexedSeq(
- new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
- launchTasks(scheduler.resourceOffers(workOffers))
+ // Make sure no executor is killed while some task is launching on it
+ val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+ // Filter out executors under killing
+ if (executorIsAlive(executorId)) {
+ val executorData = executorDataMap(executorId)
+ val workOffers = IndexedSeq(
+ new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
+ scheduler.resourceOffers(workOffers)
+ } else {
+ Seq.empty
+ }
+ }
+ if (!taskDescs.isEmpty) {
+ launchTasks(taskDescs)
}
}