diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8259923ce3..2db3a3bb81 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -406,14 +406,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } - // Called by subclasses when notified of a lost worker - def removeExecutor(executorId: String, reason: ExecutorLossReason) { - try { - driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason)) - } catch { - case e: Exception => - throw new SparkException("Error notifying standalone scheduler's driver endpoint", e) - } + /** + * Called by subclasses when notified of a lost worker. It just fires the message and returns + * at once. + */ + protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { + // Only log the failure since we don't care about the result. + driverEndpoint.ask(RemoveExecutor(executorId, reason)).onFailure { case t => + logError(t.getMessage, t) + }(ThreadUtils.sameThread) } def sufficientResourcesRegistered(): Boolean = true |