diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-12-18 12:19:07 -0600 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-12-18 12:19:07 -0600 |
commit | 253b72b56fe908bbab5d621eae8a5f359c639dfd (patch) | |
tree | 5713cfd547a20cb88a35f9fc708e9019b9fbdd17 /yarn/src | |
parent | 3b764699ffc9c74b9597c855a0e8c04ac24fa3b7 (diff) | |
download | spark-253b72b56fe908bbab5d621eae8a5f359c639dfd.tar.gz spark-253b72b56fe908bbab5d621eae8a5f359c639dfd.tar.bz2 spark-253b72b56fe908bbab5d621eae8a5f359c639dfd.zip |
SPARK-3779. yarn spark.yarn.applicationMaster.waitTries config should be...
... changed to a time period
Author: Sandy Ryza <sandy@cloudera.com>
Closes #3471 from sryza/sandy-spark-3779 and squashes the following commits:
20b9887 [Sandy Ryza] Deprecate old property
42b5df7 [Sandy Ryza] Review feedback
9a959a1 [Sandy Ryza] SPARK-3779. yarn spark.yarn.applicationMaster.waitTries config should be changed to a time period
Diffstat (limited to 'yarn/src')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 48 |
1 files changed, 24 insertions, 24 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 987b3373fb..dc7a078446 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -329,43 +329,43 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private def waitForSparkContextInitialized(): SparkContext = { logInfo("Waiting for spark context initialization") - try { - sparkContextRef.synchronized { - var count = 0 - val waitTime = 10000L - val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) - while (sparkContextRef.get() == null && count < numTries && !finished) { - logInfo("Waiting for spark context initialization ... " + count) - count = count + 1 - sparkContextRef.wait(waitTime) - } + sparkContextRef.synchronized { + val waitTries = sparkConf.getOption("spark.yarn.applicationMaster.waitTries") + .map(_.toLong * 10000L) + if (waitTries.isDefined) { + logWarning( + "spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime") + } + val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", waitTries.getOrElse(100000L)) + val deadline = System.currentTimeMillis() + totalWaitTime - val sparkContext = sparkContextRef.get() - if (sparkContext == null) { - logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" - + " log output for errors. Failing the application.").format(numTries * waitTime)) - } - sparkContext + while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) { + logInfo("Waiting for spark context initialization ... ") + sparkContextRef.wait(10000L) + } + + val sparkContext = sparkContextRef.get() + if (sparkContext == null) { + logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" + + " log output for errors. Failing the application.").format(totalWaitTime)) } + sparkContext } } private def waitForSparkDriver(): ActorRef = { logInfo("Waiting for Spark driver to be reachable.") var driverUp = false - var count = 0 val hostport = args.userArgs(0) val (driverHost, driverPort) = Utils.parseHostPort(hostport) - // spark driver should already be up since it launched us, but we don't want to + // Spark driver should already be up since it launched us, but we don't want to // wait forever, so wait 100 seconds max to match the cluster mode setting. - // Leave this config unpublished for now. SPARK-3779 to investigating changing - // this config to be time based. - val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000) + val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", 100000L) + val deadline = System.currentTimeMillis + totalWaitTime - while (!driverUp && !finished && count < numTries) { + while (!driverUp && !finished && System.currentTimeMillis < deadline) { try { - count = count + 1 val socket = new Socket(driverHost, driverPort) socket.close() logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) @@ -374,7 +374,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, case e: Exception => logError("Failed to connect to driver at %s:%s, retrying ...". format(driverHost, driverPort)) - Thread.sleep(100) + Thread.sleep(100L) } } |