aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-06-23 13:54:37 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-06-23 13:54:37 -0500
commita410814c87b120cb5cfbf095b1bd94b1de862844 (patch)
tree4266071a84a7f04273d3b2f28b07d27f5b7166ff /yarn
parent10396d9505c752cc18b6424f415d4ff0f460ad65 (diff)
downloadspark-a410814c87b120cb5cfbf095b1bd94b1de862844.tar.gz
spark-a410814c87b120cb5cfbf095b1bd94b1de862844.tar.bz2
spark-a410814c87b120cb5cfbf095b1bd94b1de862844.zip
[SPARK-15725][YARN] Ensure ApplicationMaster sleeps for the min interval.
## What changes were proposed in this pull request? Update `ApplicationMaster` to sleep for at least the minimum allocation interval before calling `allocateResources`. This prevents overloading the `YarnAllocator` that is happening because the thread is triggered when an executor is killed and its connections die. In YARN, this prevents the app from overloading the allocator and becoming unstable. ## How was this patch tested? Tested that this allows the an app to recover instead of hanging. It is still possible for the YarnAllocator to be overwhelmed by requests, but this prevents the issue for the most common cause. Author: Ryan Blue <blue@apache.org> Closes #13482 from rdblue/SPARK-15725-am-sleep-work-around.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala25
1 files changed, 22 insertions, 3 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 847d1de50f..b6f45dd634 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -457,8 +457,10 @@ private[spark] class ApplicationMaster(
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
+ var sleepStart = 0L
+ var sleepInterval = 200L // ms
allocatorLock.synchronized {
- val sleepInterval =
+ sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
@@ -468,10 +470,27 @@ private[spark] class ApplicationMaster(
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
- logDebug(s"Number of pending allocations is $numPendingAllocate. " +
- s"Sleeping for $sleepInterval.")
+ sleepStart = System.currentTimeMillis()
allocatorLock.wait(sleepInterval)
}
+ val sleepDuration = System.currentTimeMillis() - sleepStart
+ if (sleepDuration < sleepInterval) {
+ // log when sleep is interrupted
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval ms.")
+ // if sleep was less than the minimum interval, sleep for the rest of it
+ val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
+ if (toSleep > 0) {
+ logDebug(s"Going back to sleep for $toSleep ms")
+ // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
+ // by the methods that signal allocatorLock because this is just finishing the min
+ // sleep interval, which should happen even if this is signalled again.
+ Thread.sleep(toSleep)
+ }
+ } else {
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval.")
+ }
} catch {
case e: InterruptedException =>
}