aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-12-09 09:50:43 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-09 09:52:03 -0800
commit6900f0173790ad2fa4c79a426bd2dec2d149daa2 (patch)
tree3011fee8abe041652866101ec88cc0bd3c1da168 /yarn/src
parent22b9a8740d51289434553d19b6b1ac34aecdc09a (diff)
downloadspark-6900f0173790ad2fa4c79a426bd2dec2d149daa2.tar.gz
spark-6900f0173790ad2fa4c79a426bd2dec2d149daa2.tar.bz2
spark-6900f0173790ad2fa4c79a426bd2dec2d149daa2.zip
[SPARK-10582][YARN][CORE] Fix AM failure situation for dynamic allocation
Because of AM failure, the target executor number between driver and AM will be different, which will lead to unexpected behavior in dynamic allocation. So when AM is re-registered with driver, state in `ExecutorAllocationManager` and `CoarseGrainedSchedulerBacked` should be reset. This issue is originally addressed in #8737 , here re-opened again. Thanks a lot KaiXinXiaoLei for finding this issue. andrewor14 and vanzin would you please help to review this, thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #9963 from jerryshao/SPARK-10582.
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala23
1 files changed, 23 insertions, 0 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e3dd87798f..1431bceb25 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -60,6 +60,9 @@ private[spark] abstract class YarnSchedulerBackend(
/** Scheduler extension services. */
private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
+ // Flag to specify whether this schedulerBackend should be reset.
+ private var shouldResetOnAmRegister = false
+
/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
@@ -156,6 +159,16 @@ private[spark] abstract class YarnSchedulerBackend(
}
/**
+ * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed
+ * and re-registered itself to driver after a failure. The stale state in driver should be
+ * cleaned.
+ */
+ override protected def reset(): Unit = {
+ super.reset()
+ sc.executorAllocationManager.foreach(_.reset())
+ }
+
+ /**
* Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
* This endpoint communicates with the executors and queries the AM for an executor's exit
* status when the executor is disconnected.
@@ -218,6 +231,8 @@ private[spark] abstract class YarnSchedulerBackend(
case None =>
logWarning("Attempted to check for an executor loss reason" +
" before the AM has registered!")
+ driverEndpoint.askWithRetry[Boolean](
+ RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
}
}
@@ -225,6 +240,13 @@ private[spark] abstract class YarnSchedulerBackend(
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
amEndpoint = Option(am)
+ if (!shouldResetOnAmRegister) {
+ shouldResetOnAmRegister = true
+ } else {
+ // AM is already registered before, this potentially means that AM failed and
+ // a new one registered after the failure. This will only happen in yarn-client mode.
+ reset()
+ }
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
@@ -270,6 +292,7 @@ private[spark] abstract class YarnSchedulerBackend(
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (amEndpoint.exists(_.address == remoteAddress)) {
logWarning(s"ApplicationMaster has disassociated: $remoteAddress")
+ amEndpoint = None
}
}