aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala84
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala23
4 files changed, 142 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 34c32ce312..6176e25898 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -89,6 +89,8 @@ private[spark] class ExecutorAllocationManager(
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
Integer.MAX_VALUE)
+ private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors",
+ minNumExecutors)
// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
@@ -121,8 +123,7 @@ private[spark] class ExecutorAllocationManager(
// The desired number of executors at this moment in time. If all our executors were to die, this
// is the number of executors we would immediately want from the cluster manager.
- private var numExecutorsTarget =
- conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
+ private var numExecutorsTarget = initialNumExecutors
// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
@@ -241,6 +242,19 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Reset the allocation manager to the initial state. Currently this will only be called in
+ * yarn-client mode when AM re-registers after a failure.
+ */
+ def reset(): Unit = synchronized {
+ initializing = true
+ numExecutorsTarget = initialNumExecutors
+ numExecutorsToAdd = 1
+
+ executorsPendingToRemove.clear()
+ removeTimes.clear()
+ }
+
+ /**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
*/
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 505c161141..7efe16749e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -341,6 +341,25 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ /**
+ * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
+ * be called in the yarn-client mode when AM re-registers after a failure, also dynamic
+ * allocation is enabled.
+ * */
+ protected def reset(): Unit = synchronized {
+ if (Utils.isDynamicAllocationEnabled(conf)) {
+ numPendingExecutors = 0
+ executorsPendingToRemove.clear()
+
+ // Remove all the lingering executors that should be removed but not yet. The reason might be
+ // because (1) disconnected event is not yet received; (2) executors die silently.
+ executorDataMap.toMap.foreach { case (eid, _) =>
+ driverEndpoint.askWithRetry[Boolean](
+ RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
+ }
+ }
+ }
+
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 116f027a0f..fedfbd547b 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -805,6 +805,90 @@ class ExecutorAllocationManagerSuite
assert(maxNumExecutorsNeeded(manager) === 1)
}
+ test("reset the state of allocation manager") {
+ sc = createSparkContext()
+ val manager = sc.executorAllocationManager.get
+ assert(numExecutorsTarget(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 1)
+
+ // Allocation manager is reset when adding executor requests are sent without reporting back
+ // executor added.
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))
+
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsTarget(manager) === 2)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsTarget(manager) === 4)
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsTarget(manager) === 5)
+
+ manager.reset()
+ assert(numExecutorsTarget(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(executorIds(manager) === Set.empty)
+
+ // Allocation manager is reset when executors are added.
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))
+
+ addExecutors(manager)
+ addExecutors(manager)
+ addExecutors(manager)
+ assert(numExecutorsTarget(manager) === 5)
+
+ onExecutorAdded(manager, "first")
+ onExecutorAdded(manager, "second")
+ onExecutorAdded(manager, "third")
+ onExecutorAdded(manager, "fourth")
+ onExecutorAdded(manager, "fifth")
+ assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
+
+ // Cluster manager lost will make all the live executors lost, so here simulate this behavior
+ onExecutorRemoved(manager, "first")
+ onExecutorRemoved(manager, "second")
+ onExecutorRemoved(manager, "third")
+ onExecutorRemoved(manager, "fourth")
+ onExecutorRemoved(manager, "fifth")
+
+ manager.reset()
+ assert(numExecutorsTarget(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(executorIds(manager) === Set.empty)
+ assert(removeTimes(manager) === Map.empty)
+
+ // Allocation manager is reset when executors are pending to remove
+ addExecutors(manager)
+ addExecutors(manager)
+ addExecutors(manager)
+ assert(numExecutorsTarget(manager) === 5)
+
+ onExecutorAdded(manager, "first")
+ onExecutorAdded(manager, "second")
+ onExecutorAdded(manager, "third")
+ onExecutorAdded(manager, "fourth")
+ onExecutorAdded(manager, "fifth")
+ assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
+
+ removeExecutor(manager, "first")
+ removeExecutor(manager, "second")
+ assert(executorsPendingToRemove(manager) === Set("first", "second"))
+ assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
+
+
+ // Cluster manager lost will make all the live executors lost, so here simulate this behavior
+ onExecutorRemoved(manager, "first")
+ onExecutorRemoved(manager, "second")
+ onExecutorRemoved(manager, "third")
+ onExecutorRemoved(manager, "fourth")
+ onExecutorRemoved(manager, "fifth")
+
+ manager.reset()
+
+ assert(numExecutorsTarget(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(executorsPendingToRemove(manager) === Set.empty)
+ assert(removeTimes(manager) === Map.empty)
+ }
+
private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e3dd87798f..1431bceb25 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -60,6 +60,9 @@ private[spark] abstract class YarnSchedulerBackend(
/** Scheduler extension services. */
private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
+ // Flag to specify whether this schedulerBackend should be reset.
+ private var shouldResetOnAmRegister = false
+
/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
@@ -156,6 +159,16 @@ private[spark] abstract class YarnSchedulerBackend(
}
/**
+ * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed
+ * and re-registered itself to driver after a failure. The stale state in driver should be
+ * cleaned.
+ */
+ override protected def reset(): Unit = {
+ super.reset()
+ sc.executorAllocationManager.foreach(_.reset())
+ }
+
+ /**
* Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
* This endpoint communicates with the executors and queries the AM for an executor's exit
* status when the executor is disconnected.
@@ -218,6 +231,8 @@ private[spark] abstract class YarnSchedulerBackend(
case None =>
logWarning("Attempted to check for an executor loss reason" +
" before the AM has registered!")
+ driverEndpoint.askWithRetry[Boolean](
+ RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
}
}
@@ -225,6 +240,13 @@ private[spark] abstract class YarnSchedulerBackend(
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
amEndpoint = Option(am)
+ if (!shouldResetOnAmRegister) {
+ shouldResetOnAmRegister = true
+ } else {
+ // AM is already registered before, this potentially means that AM failed and
+ // a new one registered after the failure. This will only happen in yarn-client mode.
+ reset()
+ }
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
@@ -270,6 +292,7 @@ private[spark] abstract class YarnSchedulerBackend(
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (amEndpoint.exists(_.address == remoteAddress)) {
logWarning(s"ApplicationMaster has disassociated: $remoteAddress")
+ amEndpoint = None
}
}