aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-12-18 12:19:07 -0600
committerThomas Graves <tgraves@apache.org>2014-12-18 12:19:07 -0600
commit253b72b56fe908bbab5d621eae8a5f359c639dfd (patch)
tree5713cfd547a20cb88a35f9fc708e9019b9fbdd17 /yarn/src
parent3b764699ffc9c74b9597c855a0e8c04ac24fa3b7 (diff)
downloadspark-253b72b56fe908bbab5d621eae8a5f359c639dfd.tar.gz
spark-253b72b56fe908bbab5d621eae8a5f359c639dfd.tar.bz2
spark-253b72b56fe908bbab5d621eae8a5f359c639dfd.zip
SPARK-3779. yarn spark.yarn.applicationMaster.waitTries config should be...
... changed to a time period Author: Sandy Ryza <sandy@cloudera.com> Closes #3471 from sryza/sandy-spark-3779 and squashes the following commits: 20b9887 [Sandy Ryza] Deprecate old property 42b5df7 [Sandy Ryza] Review feedback 9a959a1 [Sandy Ryza] SPARK-3779. yarn spark.yarn.applicationMaster.waitTries config should be changed to a time period
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala48
1 files changed, 24 insertions, 24 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 987b3373fb..dc7a078446 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -329,43 +329,43 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private def waitForSparkContextInitialized(): SparkContext = {
logInfo("Waiting for spark context initialization")
- try {
- sparkContextRef.synchronized {
- var count = 0
- val waitTime = 10000L
- val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
- while (sparkContextRef.get() == null && count < numTries && !finished) {
- logInfo("Waiting for spark context initialization ... " + count)
- count = count + 1
- sparkContextRef.wait(waitTime)
- }
+ sparkContextRef.synchronized {
+ val waitTries = sparkConf.getOption("spark.yarn.applicationMaster.waitTries")
+ .map(_.toLong * 10000L)
+ if (waitTries.isDefined) {
+ logWarning(
+ "spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime")
+ }
+ val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", waitTries.getOrElse(100000L))
+ val deadline = System.currentTimeMillis() + totalWaitTime
- val sparkContext = sparkContextRef.get()
- if (sparkContext == null) {
- logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier"
- + " log output for errors. Failing the application.").format(numTries * waitTime))
- }
- sparkContext
+ while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {
+ logInfo("Waiting for spark context initialization ... ")
+ sparkContextRef.wait(10000L)
+ }
+
+ val sparkContext = sparkContextRef.get()
+ if (sparkContext == null) {
+ logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier"
+ + " log output for errors. Failing the application.").format(totalWaitTime))
}
+ sparkContext
}
}
private def waitForSparkDriver(): ActorRef = {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
- var count = 0
val hostport = args.userArgs(0)
val (driverHost, driverPort) = Utils.parseHostPort(hostport)
- // spark driver should already be up since it launched us, but we don't want to
+ // Spark driver should already be up since it launched us, but we don't want to
// wait forever, so wait 100 seconds max to match the cluster mode setting.
- // Leave this config unpublished for now. SPARK-3779 to investigating changing
- // this config to be time based.
- val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000)
+ val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", 100000L)
+ val deadline = System.currentTimeMillis + totalWaitTime
- while (!driverUp && !finished && count < numTries) {
+ while (!driverUp && !finished && System.currentTimeMillis < deadline) {
try {
- count = count + 1
val socket = new Socket(driverHost, driverPort)
socket.close()
logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
@@ -374,7 +374,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
case e: Exception =>
logError("Failed to connect to driver at %s:%s, retrying ...".
format(driverHost, driverPort))
- Thread.sleep(100)
+ Thread.sleep(100L)
}
}