diff options
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 25 |
1 files changed, 22 insertions, 3 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 847d1de50f..b6f45dd634 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -457,8 +457,10 @@ private[spark] class ApplicationMaster( } try { val numPendingAllocate = allocator.getPendingAllocate.size + var sleepStart = 0L + var sleepInterval = 200L // ms allocatorLock.synchronized { - val sleepInterval = + sleepInterval = if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) { val currentAllocationInterval = math.min(heartbeatInterval, nextAllocationInterval) @@ -468,10 +470,27 @@ private[spark] class ApplicationMaster( nextAllocationInterval = initialAllocationInterval heartbeatInterval } - logDebug(s"Number of pending allocations is $numPendingAllocate. " + - s"Sleeping for $sleepInterval.") + sleepStart = System.currentTimeMillis() allocatorLock.wait(sleepInterval) } + val sleepDuration = System.currentTimeMillis() - sleepStart + if (sleepDuration < sleepInterval) { + // log when sleep is interrupted + logDebug(s"Number of pending allocations is $numPendingAllocate. " + + s"Slept for $sleepDuration/$sleepInterval ms.") + // if sleep was less than the minimum interval, sleep for the rest of it + val toSleep = math.max(0, initialAllocationInterval - sleepDuration) + if (toSleep > 0) { + logDebug(s"Going back to sleep for $toSleep ms") + // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up + // by the methods that signal allocatorLock because this is just finishing the min + // sleep interval, which should happen even if this is signalled again. + Thread.sleep(toSleep) + } + } else { + logDebug(s"Number of pending allocations is $numPendingAllocate. " + + s"Slept for $sleepDuration/$sleepInterval.") + } } catch { case e: InterruptedException => } |