diff options
author | jerryshao <sshao@hortonworks.com> | 2016-03-28 17:03:21 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-03-28 17:03:21 -0700 |
commit | 2bc7c96d61a51bd458ba04e9d318640ddada559d (patch) | |
tree | a7eaaa680b23cbc18fa39aef89d4c6974aae4aa0 /core/src/main | |
parent | ad9e3d50f71b096872806a86d89c03a208b1cf8b (diff) | |
download | spark-2bc7c96d61a51bd458ba04e9d318640ddada559d.tar.gz spark-2bc7c96d61a51bd458ba04e9d318640ddada559d.tar.bz2 spark-2bc7c96d61a51bd458ba04e9d318640ddada559d.zip |
[SPARK-13447][YARN][CORE] Clean the stale states for AM failure and restart situation
## What changes were proposed in this pull request?
This is a follow-up fix of #9963, in #9963 we handle this stale states clean-up work only for dynamic allocation enabled scenario. Here we should also clean the states in `CoarseGrainedSchedulerBackend` for dynamic allocation disabled scenario.
Please review, CC andrewor14 lianhuiwang , thanks a lot.
## How was this patch tested?
Run the unit test locally, also with integration test manually.
Author: jerryshao <sshao@hortonworks.com>
Closes #11366 from jerryshao/SPARK-13447.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 21 |
1 files changed, 9 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b7919efc4b..eb4f5331d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -356,20 +356,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only - * be called in the yarn-client mode when AM re-registers after a failure, also dynamic - * allocation is enabled. + * be called in the yarn-client mode when AM re-registers after a failure. * */ protected def reset(): Unit = synchronized { - if (Utils.isDynamicAllocationEnabled(conf)) { - numPendingExecutors = 0 - executorsPendingToRemove.clear() - - // Remove all the lingering executors that should be removed but not yet. The reason might be - // because (1) disconnected event is not yet received; (2) executors die silently. - executorDataMap.toMap.foreach { case (eid, _) => - driverEndpoint.askWithRetry[Boolean]( - RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) - } + numPendingExecutors = 0 + executorsPendingToRemove.clear() + + // Remove all the lingering executors that should be removed but not yet. The reason might be + // because (1) disconnected event is not yet received; (2) executors die silently. + executorDataMap.toMap.foreach { case (eid, _) => + driverEndpoint.askWithRetry[Boolean]( + RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } } |