aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala25
1 files changed, 22 insertions, 3 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 847d1de50f..b6f45dd634 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -457,8 +457,10 @@ private[spark] class ApplicationMaster(
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
+ var sleepStart = 0L
+ var sleepInterval = 200L // ms
allocatorLock.synchronized {
- val sleepInterval =
+ sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
@@ -468,10 +470,27 @@ private[spark] class ApplicationMaster(
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
- logDebug(s"Number of pending allocations is $numPendingAllocate. " +
- s"Sleeping for $sleepInterval.")
+ sleepStart = System.currentTimeMillis()
allocatorLock.wait(sleepInterval)
}
+ val sleepDuration = System.currentTimeMillis() - sleepStart
+ if (sleepDuration < sleepInterval) {
+ // log when sleep is interrupted
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval ms.")
+ // if sleep was less than the minimum interval, sleep for the rest of it
+ val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
+ if (toSleep > 0) {
+ logDebug(s"Going back to sleep for $toSleep ms")
+ // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
+ // by the methods that signal allocatorLock because this is just finishing the min
+ // sleep interval, which should happen even if this is signalled again.
+ Thread.sleep(toSleep)
+ }
+ } else {
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval.")
+ }
} catch {
case e: InterruptedException =>
}