diff options
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 79 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 5 |
2 files changed, 50 insertions, 34 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 12ae350e4c..50ae7ffeec 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -87,8 +87,27 @@ private[spark] class ApplicationMaster( @volatile private var reporterThread: Thread = _ @volatile private var allocator: YarnAllocator = _ + + // Lock for controlling the allocator (heartbeat) thread. private val allocatorLock = new Object() + // Steady state heartbeat interval. We want to be reasonably responsive without causing too many + // requests to RM. + private val heartbeatInterval = { + // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. + val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) + math.max(0, math.min(expiryInterval / 2, + sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s"))) + } + + // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are + // being requested. + private val initialAllocationInterval = math.min(heartbeatInterval, + sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms")) + + // Next wait interval before allocator poll. + private var nextAllocationInterval = initialAllocationInterval + // Fields used in client mode. private var rpcEnv: RpcEnv = null private var amEndpoint: RpcEndpointRef = _ @@ -332,19 +351,6 @@ private[spark] class ApplicationMaster( } private def launchReporterThread(): Thread = { - // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. - val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - - // we want to be reasonably responsive without causing too many requests to RM. - val heartbeatInterval = math.max(0, math.min(expiryInterval / 2, - sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s"))) - - // we want to check more frequently for pending containers - val initialAllocationInterval = math.min(heartbeatInterval, - sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms")) - - var nextAllocationInterval = initialAllocationInterval - // The number of failures in a row until Reporter thread give up val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5) @@ -377,19 +383,19 @@ private[spark] class ApplicationMaster( } try { val numPendingAllocate = allocator.getPendingAllocate.size - val sleepInterval = - if (numPendingAllocate > 0) { - val currentAllocationInterval = - math.min(heartbeatInterval, nextAllocationInterval) - nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow - currentAllocationInterval - } else { - nextAllocationInterval = initialAllocationInterval - heartbeatInterval - } - logDebug(s"Number of pending allocations is $numPendingAllocate. " + - s"Sleeping for $sleepInterval.") allocatorLock.synchronized { + val sleepInterval = + if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) { + val currentAllocationInterval = + math.min(heartbeatInterval, nextAllocationInterval) + nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow + currentAllocationInterval + } else { + nextAllocationInterval = initialAllocationInterval + heartbeatInterval + } + logDebug(s"Number of pending allocations is $numPendingAllocate. " + + s"Sleeping for $sleepInterval.") allocatorLock.wait(sleepInterval) } } catch { @@ -560,6 +566,11 @@ private[spark] class ApplicationMaster( userThread } + private def resetAllocatorInterval(): Unit = allocatorLock.synchronized { + nextAllocationInterval = initialAllocationInterval + allocatorLock.notifyAll() + } + /** * An [[RpcEndpoint]] that communicates with the driver's scheduler backend. */ @@ -581,11 +592,9 @@ private[spark] class ApplicationMaster( case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) => Option(allocator) match { case Some(a) => - allocatorLock.synchronized { - if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal, - localityAwareTasks, hostToLocalTaskCount)) { - allocatorLock.notifyAll() - } + if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal, + localityAwareTasks, hostToLocalTaskCount)) { + resetAllocatorInterval() } case None => @@ -603,17 +612,19 @@ private[spark] class ApplicationMaster( case GetExecutorLossReason(eid) => Option(allocator) match { - case Some(a) => a.enqueueGetLossReasonRequest(eid, context) - case None => logWarning(s"Container allocator is not ready to find" + - s" executor loss reasons yet.") + case Some(a) => + a.enqueueGetLossReasonRequest(eid, context) + resetAllocatorInterval() + case None => + logWarning("Container allocator is not ready to find executor loss reasons yet.") } } override def onDisconnected(remoteAddress: RpcAddress): Unit = { - logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") // In cluster mode, do not rely on the disassociated event to exit // This avoids potentially reporting incorrect exit codes if the driver fails if (!isClusterMode) { + logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index a0cf1b4aa4..4d9e777cb4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -550,6 +550,10 @@ private[yarn] class YarnAllocator( private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease + private[yarn] def getNumPendingLossReasonRequests: Int = synchronized { + pendingLossReasonRequests.size + } + /** * Split the pending container requests into 3 groups based on current localities of pending * tasks. @@ -582,6 +586,7 @@ private[yarn] class YarnAllocator( (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq) } + } private object YarnAllocator { |