aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala')
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala23
1 files changed, 23 insertions, 0 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e3dd87798f..1431bceb25 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -60,6 +60,9 @@ private[spark] abstract class YarnSchedulerBackend(
/** Scheduler extension services. */
private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
+ // Flag to specify whether this schedulerBackend should be reset.
+ private var shouldResetOnAmRegister = false
+
/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
@@ -156,6 +159,16 @@ private[spark] abstract class YarnSchedulerBackend(
}
/**
+ * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed
+ * and re-registered itself to driver after a failure. The stale state in driver should be
+ * cleaned.
+ */
+ override protected def reset(): Unit = {
+ super.reset()
+ sc.executorAllocationManager.foreach(_.reset())
+ }
+
+ /**
* Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
* This endpoint communicates with the executors and queries the AM for an executor's exit
* status when the executor is disconnected.
@@ -218,6 +231,8 @@ private[spark] abstract class YarnSchedulerBackend(
case None =>
logWarning("Attempted to check for an executor loss reason" +
" before the AM has registered!")
+ driverEndpoint.askWithRetry[Boolean](
+ RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
}
}
@@ -225,6 +240,13 @@ private[spark] abstract class YarnSchedulerBackend(
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
amEndpoint = Option(am)
+ if (!shouldResetOnAmRegister) {
+ shouldResetOnAmRegister = true
+ } else {
+ // AM is already registered before, this potentially means that AM failed and
+ // a new one registered after the failure. This will only happen in yarn-client mode.
+ reset()
+ }
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
@@ -270,6 +292,7 @@ private[spark] abstract class YarnSchedulerBackend(
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (amEndpoint.exists(_.address == remoteAddress)) {
logWarning(s"ApplicationMaster has disassociated: $remoteAddress")
+ amEndpoint = None
}
}