diff options
author | Cheolsoo Park <cheolsoop@netflix.com> | 2015-04-13 13:45:10 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2015-04-13 13:45:10 -0500 |
commit | 6cc5b3ed3c0c729f97956fa017d8eb7d6b43f90f (patch) | |
tree | 0c1c1fb8b27573a8edfe3c2afe4a111948bd88dc /core | |
parent | c5b0b296b842926b5c07531a5affe8984bc799c5 (diff) | |
download | spark-6cc5b3ed3c0c729f97956fa017d8eb7d6b43f90f.tar.gz spark-6cc5b3ed3c0c729f97956fa017d8eb7d6b43f90f.tar.bz2 spark-6cc5b3ed3c0c729f97956fa017d8eb7d6b43f90f.zip |
[SPARK-6662][YARN] Allow variable substitution in spark.yarn.historyServer.address
In Spark on YARN, explicit hostname and port number need to be set for "spark.yarn.historyServer.address" in SparkConf to make the HISTORY link. If the history server address is known and static, this is usually not a problem.
But in cloud, that is usually not true. Particularly in EMR, the history server always runs on the same node as with RM. So I could simply set it to ${yarn.resourcemanager.hostname}:18080 if variable substitution is allowed.
In fact, Hadoop configuration already implements variable substitution, so if this property is read via YarnConf, this can be easily achievable.
Author: Cheolsoo Park <cheolsoop@netflix.com>
Closes #5321 from piaozhexiu/SPARK-6662 and squashes the following commits:
e37de75 [Cheolsoo Park] Preserve the space between the Hadoop and Spark imports
79757c6 [Cheolsoo Park] Incorporate review comments
10e2917 [Cheolsoo Park] Add helper function that substitutes hadoop vars to SparkHadoopUtil
589b52c [Cheolsoo Park] Revert "Allow variable substitution for spark.yarn. properties"
ff9c35d [Cheolsoo Park] Allow variable substitution for spark.yarn. properties
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 38 |
1 files changed, 34 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index c2568eb4b6..cfaebf9ea5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,11 +24,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.fs.FileSystem.Statistics import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} -import org.apache.hadoop.security.Credentials -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException} +import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils @@ -201,6 +200,37 @@ class SparkHadoopUtil extends Logging { val baseStatus = fs.getFileStatus(basePath) if (baseStatus.isDir) recurse(basePath) else Array(baseStatus) } + + private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored + + /** + * Substitute variables by looking them up in Hadoop configs. Only variables that match the + * ${hadoopconf- .. } pattern are substituted. + */ + def substituteHadoopVariables(text: String, hadoopConf: Configuration): String = { + text match { + case HADOOP_CONF_PATTERN(matched) => { + logDebug(text + " matched " + HADOOP_CONF_PATTERN) + val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. } + val eval = Option[String](hadoopConf.get(key)) + .map { value => + logDebug("Substituted " + matched + " with " + value) + text.replace(matched, value) + } + if (eval.isEmpty) { + // The variable was not found in Hadoop configs, so return text as is. + text + } else { + // Continue to substitute more variables. + substituteHadoopVariables(eval.get, hadoopConf) + } + } + case _ => { + logDebug(text + " didn't match " + HADOOP_CONF_PATTERN) + text + } + } + } } object SparkHadoopUtil { |