aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala79
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala5
2 files changed, 50 insertions, 34 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 12ae350e4c..50ae7ffeec 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -87,8 +87,27 @@ private[spark] class ApplicationMaster(
@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _
+
+ // Lock for controlling the allocator (heartbeat) thread.
private val allocatorLock = new Object()
+ // Steady state heartbeat interval. We want to be reasonably responsive without causing too many
+ // requests to RM.
+ private val heartbeatInterval = {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ math.max(0, math.min(expiryInterval / 2,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
+ }
+
+ // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are
+ // being requested.
+ private val initialAllocationInterval = math.min(heartbeatInterval,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
+
+ // Next wait interval before allocator poll.
+ private var nextAllocationInterval = initialAllocationInterval
+
// Fields used in client mode.
private var rpcEnv: RpcEnv = null
private var amEndpoint: RpcEndpointRef = _
@@ -332,19 +351,6 @@ private[spark] class ApplicationMaster(
}
private def launchReporterThread(): Thread = {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
-
- // we want to check more frequently for pending containers
- val initialAllocationInterval = math.min(heartbeatInterval,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
-
- var nextAllocationInterval = initialAllocationInterval
-
// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
@@ -377,19 +383,19 @@ private[spark] class ApplicationMaster(
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
- val sleepInterval =
- if (numPendingAllocate > 0) {
- val currentAllocationInterval =
- math.min(heartbeatInterval, nextAllocationInterval)
- nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
- currentAllocationInterval
- } else {
- nextAllocationInterval = initialAllocationInterval
- heartbeatInterval
- }
- logDebug(s"Number of pending allocations is $numPendingAllocate. " +
- s"Sleeping for $sleepInterval.")
allocatorLock.synchronized {
+ val sleepInterval =
+ if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
+ val currentAllocationInterval =
+ math.min(heartbeatInterval, nextAllocationInterval)
+ nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
+ currentAllocationInterval
+ } else {
+ nextAllocationInterval = initialAllocationInterval
+ heartbeatInterval
+ }
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Sleeping for $sleepInterval.")
allocatorLock.wait(sleepInterval)
}
} catch {
@@ -560,6 +566,11 @@ private[spark] class ApplicationMaster(
userThread
}
+ private def resetAllocatorInterval(): Unit = allocatorLock.synchronized {
+ nextAllocationInterval = initialAllocationInterval
+ allocatorLock.notifyAll()
+ }
+
/**
* An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
*/
@@ -581,11 +592,9 @@ private[spark] class ApplicationMaster(
case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
Option(allocator) match {
case Some(a) =>
- allocatorLock.synchronized {
- if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
- localityAwareTasks, hostToLocalTaskCount)) {
- allocatorLock.notifyAll()
- }
+ if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+ localityAwareTasks, hostToLocalTaskCount)) {
+ resetAllocatorInterval()
}
case None =>
@@ -603,17 +612,19 @@ private[spark] class ApplicationMaster(
case GetExecutorLossReason(eid) =>
Option(allocator) match {
- case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
- case None => logWarning(s"Container allocator is not ready to find" +
- s" executor loss reasons yet.")
+ case Some(a) =>
+ a.enqueueGetLossReasonRequest(eid, context)
+ resetAllocatorInterval()
+ case None =>
+ logWarning("Container allocator is not ready to find executor loss reasons yet.")
}
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
- logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
// In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!isClusterMode) {
+ logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a0cf1b4aa4..4d9e777cb4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -550,6 +550,10 @@ private[yarn] class YarnAllocator(
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
+ private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
+ pendingLossReasonRequests.size
+ }
+
/**
* Split the pending container requests into 3 groups based on current localities of pending
* tasks.
@@ -582,6 +586,7 @@ private[yarn] class YarnAllocator(
(localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
}
+
}
private object YarnAllocator {