aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala38
-rw-r--r--docs/running-on-yarn.md3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala1
3 files changed, 37 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index c2568eb4b6..cfaebf9ea5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -24,11 +24,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
+import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
@@ -201,6 +200,37 @@ class SparkHadoopUtil extends Logging {
val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}
+
+ private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
+
+ /**
+ * Substitute variables by looking them up in Hadoop configs. Only variables that match the
+ * ${hadoopconf- .. } pattern are substituted.
+ */
+ def substituteHadoopVariables(text: String, hadoopConf: Configuration): String = {
+ text match {
+ case HADOOP_CONF_PATTERN(matched) => {
+ logDebug(text + " matched " + HADOOP_CONF_PATTERN)
+ val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. }
+ val eval = Option[String](hadoopConf.get(key))
+ .map { value =>
+ logDebug("Substituted " + matched + " with " + value)
+ text.replace(matched, value)
+ }
+ if (eval.isEmpty) {
+ // The variable was not found in Hadoop configs, so return text as is.
+ text
+ } else {
+ // Continue to substitute more variables.
+ substituteHadoopVariables(eval.get, hadoopConf)
+ }
+ }
+ case _ => {
+ logDebug(text + " didn't match " + HADOOP_CONF_PATTERN)
+ text
+ }
+ }
+ }
}
object SparkHadoopUtil {
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index b7e68d4f71..ed5bb263a5 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -87,7 +87,8 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
<td><code>spark.yarn.historyServer.address</code></td>
<td>(none)</td>
<td>
- The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI.
+ The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI.
+ For this property, YARN properties can be used as variables, and these are substituted by Spark at runtime. For eg, if the Spark history server runs on the same node as the YARN ResourceManager, it can be set to `${hadoopconf-yarn.resourcemanager.hostname}:18080`.
</td>
</tr>
<tr>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 32bc4e5663..26259cee77 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -223,6 +223,7 @@ private[spark] class ApplicationMaster(
val appId = client.getAttemptId().getApplicationId().toString()
val historyAddress =
sparkConf.getOption("spark.yarn.historyServer.address")
+ .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
.getOrElse("")