aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-04 09:07:22 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-04 09:07:22 -0800
commit8790ee6d69e50ca84eb849742be48f2476743b5b (patch)
treed99e02c1bc02a92135c7da2f5745e57fd04663af /yarn
parent9b214cea896056e7d0a69ae9d3c282e1f027d5b9 (diff)
downloadspark-8790ee6d69e50ca84eb849742be48f2476743b5b.tar.gz
spark-8790ee6d69e50ca84eb849742be48f2476743b5b.tar.bz2
spark-8790ee6d69e50ca84eb849742be48f2476743b5b.zip
[SPARK-10622][CORE][YARN] Differentiate dead from "mostly dead" executors.
In YARN mode, when preemption is enabled, we may leave executors in a zombie state while we wait to retrieve the reason for which the executor exited. This is so that we don't account for failed tasks that were running on a preempted executor. The issue is that while we wait for this information, the scheduler might decide to schedule tasks on the executor, which will never be able to run them. Other side effects include the block manager still considering the executor available to cache blocks, for example. So, when we know that an executor went down but we don't know why, stop everything related to the executor, except its running tasks. Only when we know the reason for the exit (or give up waiting for it) we do update the running tasks. This is achieved by a new `disableExecutor()` method in the `Schedulable` interface. For managers that do not behave like this (i.e. every one but YARN), the existing `executorLost()` method will behave the same way it did before. On top of that change, a few minor changes that made debugging easier, and fixed some other minor issues: - The cluster-mode AM was printing a misleading log message every time an executor disconnected from the driver (because the akka actor system was shared between driver and AM). - Avoid sending unnecessary requests for an executor's exit reason when we already know it was explicitly disabled / killed. This avoids both multiple requests, and unnecessary requests that would just cause warning messages on the AM (in the explicit kill case). - Tone down a log message about the executor being lost when it exited normally (e.g. preemption) - Wake up the AM monitor thread when requests for executor loss reasons arrive too, so that we can more quickly remove executors from this zombie state. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #8887 from vanzin/SPARK-10622.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala79
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala5
2 files changed, 50 insertions, 34 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 12ae350e4c..50ae7ffeec 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -87,8 +87,27 @@ private[spark] class ApplicationMaster(
@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _
+
+ // Lock for controlling the allocator (heartbeat) thread.
private val allocatorLock = new Object()
+ // Steady state heartbeat interval. We want to be reasonably responsive without causing too many
+ // requests to RM.
+ private val heartbeatInterval = {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ math.max(0, math.min(expiryInterval / 2,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
+ }
+
+ // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are
+ // being requested.
+ private val initialAllocationInterval = math.min(heartbeatInterval,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
+
+ // Next wait interval before allocator poll.
+ private var nextAllocationInterval = initialAllocationInterval
+
// Fields used in client mode.
private var rpcEnv: RpcEnv = null
private var amEndpoint: RpcEndpointRef = _
@@ -332,19 +351,6 @@ private[spark] class ApplicationMaster(
}
private def launchReporterThread(): Thread = {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
-
- // we want to check more frequently for pending containers
- val initialAllocationInterval = math.min(heartbeatInterval,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
-
- var nextAllocationInterval = initialAllocationInterval
-
// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
@@ -377,19 +383,19 @@ private[spark] class ApplicationMaster(
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
- val sleepInterval =
- if (numPendingAllocate > 0) {
- val currentAllocationInterval =
- math.min(heartbeatInterval, nextAllocationInterval)
- nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
- currentAllocationInterval
- } else {
- nextAllocationInterval = initialAllocationInterval
- heartbeatInterval
- }
- logDebug(s"Number of pending allocations is $numPendingAllocate. " +
- s"Sleeping for $sleepInterval.")
allocatorLock.synchronized {
+ val sleepInterval =
+ if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
+ val currentAllocationInterval =
+ math.min(heartbeatInterval, nextAllocationInterval)
+ nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
+ currentAllocationInterval
+ } else {
+ nextAllocationInterval = initialAllocationInterval
+ heartbeatInterval
+ }
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Sleeping for $sleepInterval.")
allocatorLock.wait(sleepInterval)
}
} catch {
@@ -560,6 +566,11 @@ private[spark] class ApplicationMaster(
userThread
}
+ private def resetAllocatorInterval(): Unit = allocatorLock.synchronized {
+ nextAllocationInterval = initialAllocationInterval
+ allocatorLock.notifyAll()
+ }
+
/**
* An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
*/
@@ -581,11 +592,9 @@ private[spark] class ApplicationMaster(
case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
Option(allocator) match {
case Some(a) =>
- allocatorLock.synchronized {
- if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
- localityAwareTasks, hostToLocalTaskCount)) {
- allocatorLock.notifyAll()
- }
+ if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+ localityAwareTasks, hostToLocalTaskCount)) {
+ resetAllocatorInterval()
}
case None =>
@@ -603,17 +612,19 @@ private[spark] class ApplicationMaster(
case GetExecutorLossReason(eid) =>
Option(allocator) match {
- case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
- case None => logWarning(s"Container allocator is not ready to find" +
- s" executor loss reasons yet.")
+ case Some(a) =>
+ a.enqueueGetLossReasonRequest(eid, context)
+ resetAllocatorInterval()
+ case None =>
+ logWarning("Container allocator is not ready to find executor loss reasons yet.")
}
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
- logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
// In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!isClusterMode) {
+ logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a0cf1b4aa4..4d9e777cb4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -550,6 +550,10 @@ private[yarn] class YarnAllocator(
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
+ private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
+ pendingLossReasonRequests.size
+ }
+
/**
* Split the pending container requests into 3 groups based on current localities of pending
* tasks.
@@ -582,6 +586,7 @@ private[yarn] class YarnAllocator(
(localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
}
+
}
private object YarnAllocator {