aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala38
1 files changed, 26 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 94abe30bb1..7e2cfaccfc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -222,12 +222,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
- // Filter out executors under killing
- val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
- val workOffers = activeExecutors.map { case (id, executorData) =>
- new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
- }.toIndexedSeq
- launchTasks(scheduler.resourceOffers(workOffers))
+ // Make sure no executor is killed while some task is launching on it
+ val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+ // Filter out executors under killing
+ val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
+ val workOffers = activeExecutors.map { case (id, executorData) =>
+ new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
+ }.toIndexedSeq
+ scheduler.resourceOffers(workOffers)
+ }
+ if (!taskDescs.isEmpty) {
+ launchTasks(taskDescs)
+ }
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -240,12 +246,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
- // Filter out executors under killing
- if (executorIsAlive(executorId)) {
- val executorData = executorDataMap(executorId)
- val workOffers = IndexedSeq(
- new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
- launchTasks(scheduler.resourceOffers(workOffers))
+ // Make sure no executor is killed while some task is launching on it
+ val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+ // Filter out executors under killing
+ if (executorIsAlive(executorId)) {
+ val executorData = executorDataMap(executorId)
+ val workOffers = IndexedSeq(
+ new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
+ scheduler.resourceOffers(workOffers)
+ } else {
+ Seq.empty
+ }
+ }
+ if (!taskDescs.isEmpty) {
+ launchTasks(taskDescs)
}
}