aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-07-20 18:24:35 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-20 18:24:35 -0700
commit75a06aa256aa256c112555609a93c1e1dbb1cb4b (patch)
tree1cc8d46270741d5650a5441b957f79940280b30f /core/src/main
parente651900bd562cc29a3eb13e92a5147979e347f61 (diff)
downloadspark-75a06aa256aa256c112555609a93c1e1dbb1cb4b.tar.gz
spark-75a06aa256aa256c112555609a93c1e1dbb1cb4b.tar.bz2
spark-75a06aa256aa256c112555609a93c1e1dbb1cb4b.zip
[SPARK-16272][CORE] Allow config values to reference conf, env, system props.
This allows configuration to be more flexible, for example, when the cluster does not have a homogeneous configuration (e.g. packages are installed on different paths in different nodes). By allowing one to reference the environment from the conf, it becomes possible to work around those in certain cases. As part of the implementation, ConfigEntry now keeps track of all "known" configs (i.e. those created through the use of ConfigBuilder), since that list is used by the resolution code. This duplicates some code in SQLConf, which could potentially be merged with this now. It will also make it simpler to implement some missing features such as filtering which configs show up in the UI or in event logs - which are not part of this change. Another change is in the way ConfigEntry reads config data; it now takes a string map and a function that reads env variables, so that it can be called both from SparkConf and SQLConf. This makes it so both places follow the same read path, instead of having to replicate certain logic in SQLConf. There are still a couple of methods in SQLConf that peek into fields of ConfigEntry directly, though. Tested via unit tests, and by using the new variable expansion functionality in a shell session with a custom spark.sql.hive.metastore.jars value. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #14022 from vanzin/SPARK-16272.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala132
3 files changed, 136 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 33ed0d5493..f6af9ccc41 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -248,7 +248,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
* - This will throw an exception is the config is not optional and the value is not set.
*/
private[spark] def get[T](entry: ConfigEntry[T]): T = {
- entry.readFrom(this)
+ entry.readFrom(settings, getenv)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index 5d50e3851a..0f5c8a9e02 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -116,11 +116,17 @@ private[spark] class TypedConfigBuilder[T](
/** Creates a [[ConfigEntry]] that has a default value. */
def createWithDefault(default: T): ConfigEntry[T] = {
- val transformedDefault = converter(stringConverter(default))
- val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
- stringConverter, parent._doc, parent._public)
- parent._onCreate.foreach(_(entry))
- entry
+ // Treat "String" as a special case, so that both createWithDefault and createWithDefaultString
+ // behave the same w.r.t. variable expansion of default values.
+ if (default.isInstanceOf[String]) {
+ createWithDefaultString(default.asInstanceOf[String])
+ } else {
+ val transformedDefault = converter(stringConverter(default))
+ val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
+ stringConverter, parent._doc, parent._public)
+ parent._onCreate.foreach(_(entry))
+ entry
+ }
}
/**
@@ -128,8 +134,7 @@ private[spark] class TypedConfigBuilder[T](
* [[String]] and must be a valid value for the entry.
*/
def createWithDefaultString(default: String): ConfigEntry[T] = {
- val typedDefault = converter(default)
- val entry = new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter,
+ val entry = new ConfigEntryWithDefaultString[T](parent.key, default, converter, stringConverter,
parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
index f7296b487c..e2e23b3c3c 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
@@ -17,11 +17,35 @@
package org.apache.spark.internal.config
+import java.util.{Map => JMap}
+
+import scala.util.matching.Regex
+
import org.apache.spark.SparkConf
/**
* An entry contains all meta information for a configuration.
*
+ * Config options created using this feature support variable expansion. If the config value
+ * contains variable references of the form "${prefix:variableName}", the reference will be replaced
+ * with the value of the variable depending on the prefix. The prefix can be one of:
+ *
+ * - no prefix: if the config key starts with "spark", looks for the value in the Spark config
+ * - system: looks for the value in the system properties
+ * - env: looks for the value in the environment
+ *
+ * So referencing "${spark.master}" will look for the value of "spark.master" in the Spark
+ * configuration, while referencing "${env:MASTER}" will read the value from the "MASTER"
+ * environment variable.
+ *
+ * For known Spark configuration keys (i.e. those created using `ConfigBuilder`), references
+ * will also consider the default value when it exists.
+ *
+ * If the reference cannot be resolved, the original string will be retained.
+ *
+ * Variable expansion is also applied to the default values of config entries that have a default
+ * value declared as a string.
+ *
* @param key the key for the configuration
* @param defaultValue the default value for the configuration
* @param valueConverter how to convert a string to the value. It should throw an exception if the
@@ -42,17 +66,27 @@ private[spark] abstract class ConfigEntry[T] (
val doc: String,
val isPublic: Boolean) {
+ import ConfigEntry._
+
+ registerEntry(this)
+
def defaultValueString: String
- def readFrom(conf: SparkConf): T
+ def readFrom(conf: JMap[String, String], getenv: String => String): T
- // This is used by SQLConf, since it doesn't use SparkConf to store settings and thus cannot
- // use readFrom().
def defaultValue: Option[T] = None
override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
}
+
+ protected def readAndExpand(
+ conf: JMap[String, String],
+ getenv: String => String,
+ usedRefs: Set[String] = Set()): Option[String] = {
+ Option(conf.get(key)).map(expand(_, conf, getenv, usedRefs))
+ }
+
}
private class ConfigEntryWithDefault[T] (
@@ -68,12 +102,36 @@ private class ConfigEntryWithDefault[T] (
override def defaultValueString: String = stringConverter(_defaultValue)
- override def readFrom(conf: SparkConf): T = {
- conf.getOption(key).map(valueConverter).getOrElse(_defaultValue)
+ def readFrom(conf: JMap[String, String], getenv: String => String): T = {
+ readAndExpand(conf, getenv).map(valueConverter).getOrElse(_defaultValue)
}
}
+private class ConfigEntryWithDefaultString[T] (
+ key: String,
+ _defaultValue: String,
+ valueConverter: String => T,
+ stringConverter: T => String,
+ doc: String,
+ isPublic: Boolean)
+ extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
+
+ override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
+
+ override def defaultValueString: String = _defaultValue
+
+ def readFrom(conf: JMap[String, String], getenv: String => String): T = {
+ Option(conf.get(key))
+ .orElse(Some(_defaultValue))
+ .map(ConfigEntry.expand(_, conf, getenv, Set()))
+ .map(valueConverter)
+ .get
+ }
+
+}
+
+
/**
* A config entry that does not have a default value.
*/
@@ -88,7 +146,9 @@ private[spark] class OptionalConfigEntry[T](
override def defaultValueString: String = "<undefined>"
- override def readFrom(conf: SparkConf): Option[T] = conf.getOption(key).map(rawValueConverter)
+ override def readFrom(conf: JMap[String, String], getenv: String => String): Option[T] = {
+ readAndExpand(conf, getenv).map(rawValueConverter)
+ }
}
@@ -99,13 +159,67 @@ private class FallbackConfigEntry[T] (
key: String,
doc: String,
isPublic: Boolean,
- private val fallback: ConfigEntry[T])
+ private[config] val fallback: ConfigEntry[T])
extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
override def defaultValueString: String = s"<value of ${fallback.key}>"
- override def readFrom(conf: SparkConf): T = {
- conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
+ override def readFrom(conf: JMap[String, String], getenv: String => String): T = {
+ Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv))
+ }
+
+}
+
+private object ConfigEntry {
+
+ private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
+
+ private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
+
+ def registerEntry(entry: ConfigEntry[_]): Unit = {
+ val existing = knownConfigs.putIfAbsent(entry.key, entry)
+ require(existing == null, s"Config entry ${entry.key} already registered!")
+ }
+
+ def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
+
+ /**
+ * Expand the `value` according to the rules explained in ConfigEntry.
+ */
+ def expand(
+ value: String,
+ conf: JMap[String, String],
+ getenv: String => String,
+ usedRefs: Set[String]): String = {
+ REF_RE.replaceAllIn(value, { m =>
+ val prefix = m.group(1)
+ val name = m.group(2)
+ val replacement = prefix match {
+ case null =>
+ require(!usedRefs.contains(name), s"Circular reference in $value: $name")
+ if (name.startsWith("spark.")) {
+ Option(findEntry(name))
+ .flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name))
+ .orElse(Option(conf.get(name)))
+ .orElse(defaultValueString(name))
+ } else {
+ None
+ }
+ case "system" => sys.props.get(name)
+ case "env" => Option(getenv(name))
+ case _ => None
+ }
+ Regex.quoteReplacement(replacement.getOrElse(m.matched))
+ })
+ }
+
+ private def defaultValueString(key: String): Option[String] = {
+ findEntry(key) match {
+ case e: ConfigEntryWithDefault[_] => Some(e.defaultValueString)
+ case e: ConfigEntryWithDefaultString[_] => Some(e.defaultValueString)
+ case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
+ case _ => None
+ }
}
}