aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorehnalis <zoltan.zvara@gmail.com>2015-05-20 08:27:39 -0500
committerThomas Graves <tgraves@apache.org>2015-05-20 08:27:39 -0500
commit3ddf051ee7256f642f8a17768d161c7b5f55c7e1 (patch)
tree36d9986102035f54b43ca545cc4c78b2d48b4b34 /yarn
parent09265ad7c85c6de6b568ec329daad632d4a79fa3 (diff)
downloadspark-3ddf051ee7256f642f8a17768d161c7b5f55c7e1.tar.gz
spark-3ddf051ee7256f642f8a17768d161c7b5f55c7e1.tar.bz2
spark-3ddf051ee7256f642f8a17768d161c7b5f55c7e1.zip
[SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
Added faster RM-heartbeats on pending container allocations with multiplicative back-off. Also updated related documentations. Author: ehnalis <zoltan.zvara@gmail.com> Closes #6082 from ehnalis/yarn and squashes the following commits: a1d2101 [ehnalis] MIss-spell fixed. 90f8ba4 [ehnalis] Changed default HB values. 6120295 [ehnalis] Removed the bug, when allocation heartbeat would not start from initial value. 08bac63 [ehnalis] Refined style, grammar, removed duplicated code. 073d283 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats. d4408c9 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala34
1 files changed, 25 insertions, 9 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 29752969e6..63a6f2e947 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -300,11 +300,14 @@ private[spark] class ApplicationMaster(
val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s")
+ val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
- // must be <= expiryInterval / 2.
- val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
+ // we want to check more frequently for pending containers
+ val initialAllocationInterval = math.min(heartbeatInterval,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
+
+ var nextAllocationInterval = initialAllocationInterval
// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
@@ -330,15 +333,27 @@ private[spark] class ApplicationMaster(
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
- s"${failureCount} time(s) from Reporter thread.")
-
+ s"$failureCount time(s) from Reporter thread.")
} else {
- logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
+ logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
}
}
try {
- Thread.sleep(interval)
+ val numPendingAllocate = allocator.getNumPendingAllocate
+ val sleepInterval =
+ if (numPendingAllocate > 0) {
+ val currentAllocationInterval =
+ math.min(heartbeatInterval, nextAllocationInterval)
+ nextAllocationInterval *= 2
+ currentAllocationInterval
+ } else {
+ nextAllocationInterval = initialAllocationInterval
+ heartbeatInterval
+ }
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Sleeping for $sleepInterval.")
+ Thread.sleep(sleepInterval)
} catch {
case e: InterruptedException =>
}
@@ -349,7 +364,8 @@ private[spark] class ApplicationMaster(
t.setDaemon(true)
t.setName("Reporter")
t.start()
- logInfo("Started progress reporter thread - sleep time : " + interval)
+ logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
+ s"initial allocation : $initialAllocationInterval) intervals")
t
}