diff options
author | Jimmy Xiang <jxiang@apache.org> | 2017-03-09 10:52:18 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2017-03-09 10:52:18 -0800 |
commit | b60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9 (patch) | |
tree | b9d7b7d2a0e7916d436c6f5f471972d118c0d443 /core/src/main | |
parent | 206030bd12405623c00c1ff334663984b9250adb (diff) | |
download | spark-b60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9.tar.gz spark-b60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9.tar.bz2 spark-b60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9.zip |
[SPARK-19757][CORE] DriverEndpoint#makeOffers race against CoarseGrainedSchedulerBackend#killExecutors
## What changes were proposed in this pull request?
While some executors are being killed due to idleness, if some new tasks come in, driver could assign them to some executors are being killed. These tasks will fail later when the executors are lost. This patch is to make sure CoarseGrainedSchedulerBackend#killExecutors and DriverEndpoint#makeOffers are properly synchronized.
## How was this patch tested?
manual tests
Author: Jimmy Xiang <jxiang@apache.org>
Closes #17091 from jxiang/spark-19757.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 38 |
1 files changed, 26 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 94abe30bb1..7e2cfaccfc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -222,12 +222,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Make fake resource offers on all executors private def makeOffers() { - // Filter out executors under killing - val activeExecutors = executorDataMap.filterKeys(executorIsAlive) - val workOffers = activeExecutors.map { case (id, executorData) => - new WorkerOffer(id, executorData.executorHost, executorData.freeCores) - }.toIndexedSeq - launchTasks(scheduler.resourceOffers(workOffers)) + // Make sure no executor is killed while some task is launching on it + val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { + // Filter out executors under killing + val activeExecutors = executorDataMap.filterKeys(executorIsAlive) + val workOffers = activeExecutors.map { case (id, executorData) => + new WorkerOffer(id, executorData.executorHost, executorData.freeCores) + }.toIndexedSeq + scheduler.resourceOffers(workOffers) + } + if (!taskDescs.isEmpty) { + launchTasks(taskDescs) + } } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -240,12 +246,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Make fake resource offers on just one executor private def makeOffers(executorId: String) { - // Filter out executors under killing - if (executorIsAlive(executorId)) { - val executorData = executorDataMap(executorId) - val workOffers = IndexedSeq( - new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) - launchTasks(scheduler.resourceOffers(workOffers)) + // Make sure no executor is killed while some task is launching on it + val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { + // Filter out executors under killing + if (executorIsAlive(executorId)) { + val executorData = executorDataMap(executorId) + val workOffers = IndexedSeq( + new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) + scheduler.resourceOffers(workOffers) + } else { + Seq.empty + } + } + if (!taskDescs.isEmpty) { + launchTasks(taskDescs) } } |