aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala92
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala74
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala106
-rw-r--r--core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala78
-rw-r--r--core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala62
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala92
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala18
9 files changed, 312 insertions, 228 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b6d244b1a0..31b41d9524 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.LinkedHashSet
import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
+import org.apache.spark.internal.config._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
@@ -56,6 +56,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
private val settings = new ConcurrentHashMap[String, String]()
+ private val reader = new ConfigReader(new SparkConfigProvider(settings))
+ reader.bindEnv(new ConfigProvider {
+ override def get(key: String): Option[String] = Option(getenv(key))
+ })
+
if (loadDefaults) {
loadFromSystemProperties(false)
}
@@ -248,7 +253,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
* - This will throw an exception is the config is not optional and the value is not set.
*/
private[spark] def get[T](entry: ConfigEntry[T]): T = {
- entry.readFrom(settings, getenv)
+ entry.readFrom(reader)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
index e2e23b3c3c..113037d1ab 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
@@ -26,22 +26,9 @@ import org.apache.spark.SparkConf
/**
* An entry contains all meta information for a configuration.
*
- * Config options created using this feature support variable expansion. If the config value
- * contains variable references of the form "${prefix:variableName}", the reference will be replaced
- * with the value of the variable depending on the prefix. The prefix can be one of:
- *
- * - no prefix: if the config key starts with "spark", looks for the value in the Spark config
- * - system: looks for the value in the system properties
- * - env: looks for the value in the environment
- *
- * So referencing "${spark.master}" will look for the value of "spark.master" in the Spark
- * configuration, while referencing "${env:MASTER}" will read the value from the "MASTER"
- * environment variable.
- *
- * For known Spark configuration keys (i.e. those created using `ConfigBuilder`), references
- * will also consider the default value when it exists.
- *
- * If the reference cannot be resolved, the original string will be retained.
+ * When applying variable substitution to config values, only references starting with "spark." are
+ * considered in the default namespace. For known Spark configuration keys (i.e. those created using
+ * `ConfigBuilder`), references will also consider the default value when it exists.
*
* Variable expansion is also applied to the default values of config entries that have a default
* value declared as a string.
@@ -72,7 +59,7 @@ private[spark] abstract class ConfigEntry[T] (
def defaultValueString: String
- def readFrom(conf: JMap[String, String], getenv: String => String): T
+ def readFrom(reader: ConfigReader): T
def defaultValue: Option[T] = None
@@ -80,13 +67,6 @@ private[spark] abstract class ConfigEntry[T] (
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
}
- protected def readAndExpand(
- conf: JMap[String, String],
- getenv: String => String,
- usedRefs: Set[String] = Set()): Option[String] = {
- Option(conf.get(key)).map(expand(_, conf, getenv, usedRefs))
- }
-
}
private class ConfigEntryWithDefault[T] (
@@ -102,8 +82,8 @@ private class ConfigEntryWithDefault[T] (
override def defaultValueString: String = stringConverter(_defaultValue)
- def readFrom(conf: JMap[String, String], getenv: String => String): T = {
- readAndExpand(conf, getenv).map(valueConverter).getOrElse(_defaultValue)
+ def readFrom(reader: ConfigReader): T = {
+ reader.get(key).map(valueConverter).getOrElse(_defaultValue)
}
}
@@ -121,12 +101,9 @@ private class ConfigEntryWithDefaultString[T] (
override def defaultValueString: String = _defaultValue
- def readFrom(conf: JMap[String, String], getenv: String => String): T = {
- Option(conf.get(key))
- .orElse(Some(_defaultValue))
- .map(ConfigEntry.expand(_, conf, getenv, Set()))
- .map(valueConverter)
- .get
+ def readFrom(reader: ConfigReader): T = {
+ val value = reader.get(key).getOrElse(reader.substitute(_defaultValue))
+ valueConverter(value)
}
}
@@ -146,8 +123,8 @@ private[spark] class OptionalConfigEntry[T](
override def defaultValueString: String = "<undefined>"
- override def readFrom(conf: JMap[String, String], getenv: String => String): Option[T] = {
- readAndExpand(conf, getenv).map(rawValueConverter)
+ override def readFrom(reader: ConfigReader): Option[T] = {
+ reader.get(key).map(rawValueConverter)
}
}
@@ -164,18 +141,16 @@ private class FallbackConfigEntry[T] (
override def defaultValueString: String = s"<value of ${fallback.key}>"
- override def readFrom(conf: JMap[String, String], getenv: String => String): T = {
- Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv))
+ override def readFrom(reader: ConfigReader): T = {
+ reader.get(key).map(valueConverter).getOrElse(fallback.readFrom(reader))
}
}
-private object ConfigEntry {
+private[spark] object ConfigEntry {
private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
- private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
-
def registerEntry(entry: ConfigEntry[_]): Unit = {
val existing = knownConfigs.putIfAbsent(entry.key, entry)
require(existing == null, s"Config entry ${entry.key} already registered!")
@@ -183,43 +158,4 @@ private object ConfigEntry {
def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
- /**
- * Expand the `value` according to the rules explained in ConfigEntry.
- */
- def expand(
- value: String,
- conf: JMap[String, String],
- getenv: String => String,
- usedRefs: Set[String]): String = {
- REF_RE.replaceAllIn(value, { m =>
- val prefix = m.group(1)
- val name = m.group(2)
- val replacement = prefix match {
- case null =>
- require(!usedRefs.contains(name), s"Circular reference in $value: $name")
- if (name.startsWith("spark.")) {
- Option(findEntry(name))
- .flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name))
- .orElse(Option(conf.get(name)))
- .orElse(defaultValueString(name))
- } else {
- None
- }
- case "system" => sys.props.get(name)
- case "env" => Option(getenv(name))
- case _ => None
- }
- Regex.quoteReplacement(replacement.getOrElse(m.matched))
- })
- }
-
- private def defaultValueString(key: String): Option[String] = {
- findEntry(key) match {
- case e: ConfigEntryWithDefault[_] => Some(e.defaultValueString)
- case e: ConfigEntryWithDefaultString[_] => Some(e.defaultValueString)
- case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
- case _ => None
- }
- }
-
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
new file mode 100644
index 0000000000..4b546c847a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.{Map => JMap}
+
+/**
+ * A source of configuration values.
+ */
+private[spark] trait ConfigProvider {
+
+ def get(key: String): Option[String]
+
+}
+
+private[spark] class EnvProvider extends ConfigProvider {
+
+ override def get(key: String): Option[String] = sys.env.get(key)
+
+}
+
+private[spark] class SystemProvider extends ConfigProvider {
+
+ override def get(key: String): Option[String] = sys.props.get(key)
+
+}
+
+private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvider {
+
+ override def get(key: String): Option[String] = Option(conf.get(key))
+
+}
+
+/**
+ * A config provider that only reads Spark config keys, and considers default values for known
+ * configs when fetching configuration values.
+ */
+private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {
+
+ import ConfigEntry._
+
+ override def get(key: String): Option[String] = {
+ if (key.startsWith("spark.")) {
+ Option(conf.get(key)).orElse(defaultValueString(key))
+ } else {
+ None
+ }
+ }
+
+ private def defaultValueString(key: String): Option[String] = {
+ findEntry(key) match {
+ case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
+ case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
+ case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
+ case _ => None
+ }
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala
new file mode 100644
index 0000000000..bb1a3bb5fc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.{Map => JMap}
+import java.util.regex.Pattern
+
+import scala.collection.mutable.HashMap
+import scala.util.matching.Regex
+
+private object ConfigReader {
+
+ private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
+
+}
+
+/**
+ * A helper class for reading config entries and performing variable substitution.
+ *
+ * If a config value contains variable references of the form "${prefix:variableName}", the
+ * reference will be replaced with the value of the variable depending on the prefix. By default,
+ * the following prefixes are handled:
+ *
+ * - no prefix: use the default config provider
+ * - system: looks for the value in the system properties
+ * - env: looks for the value in the environment
+ *
+ * Different prefixes can be bound to a `ConfigProvider`, which is used to read configuration
+ * values from the data source for the prefix, and both the system and env providers can be
+ * overridden.
+ *
+ * If the reference cannot be resolved, the original string will be retained.
+ *
+ * @param conf The config provider for the default namespace (no prefix).
+ */
+private[spark] class ConfigReader(conf: ConfigProvider) {
+
+ def this(conf: JMap[String, String]) = this(new MapProvider(conf))
+
+ private val bindings = new HashMap[String, ConfigProvider]()
+ bind(null, conf)
+ bindEnv(new EnvProvider())
+ bindSystem(new SystemProvider())
+
+ /**
+ * Binds a prefix to a provider. This method is not thread-safe and should be called
+ * before the instance is used to expand values.
+ */
+ def bind(prefix: String, provider: ConfigProvider): ConfigReader = {
+ bindings(prefix) = provider
+ this
+ }
+
+ def bind(prefix: String, values: JMap[String, String]): ConfigReader = {
+ bind(prefix, new MapProvider(values))
+ }
+
+ def bindEnv(provider: ConfigProvider): ConfigReader = bind("env", provider)
+
+ def bindSystem(provider: ConfigProvider): ConfigReader = bind("system", provider)
+
+ /**
+ * Reads a configuration key from the default provider, and apply variable substitution.
+ */
+ def get(key: String): Option[String] = conf.get(key).map(substitute)
+
+ /**
+ * Perform variable substitution on the given input string.
+ */
+ def substitute(input: String): String = substitute(input, Set())
+
+ private def substitute(input: String, usedRefs: Set[String]): String = {
+ if (input != null) {
+ ConfigReader.REF_RE.replaceAllIn(input, { m =>
+ val prefix = m.group(1)
+ val name = m.group(2)
+ val ref = if (prefix == null) name else s"$prefix:$name"
+ require(!usedRefs.contains(ref), s"Circular reference in $input: $ref")
+
+ val replacement = bindings.get(prefix)
+ .flatMap(_.get(name))
+ .map { v => substitute(v, usedRefs + ref) }
+ .getOrElse(m.matched)
+ Regex.quoteReplacement(replacement)
+ })
+ } else {
+ input
+ }
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index ebdb69f31e..91a96bdda6 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.util.SparkConfWithEnv
class ConfigEntrySuite extends SparkFunSuite {
@@ -161,25 +162,9 @@ class ConfigEntrySuite extends SparkFunSuite {
assert(conf.get(stringConf) === null)
}
- test("variable expansion") {
+ test("variable expansion of spark config entries") {
val env = Map("ENV1" -> "env1")
- val conf = HashMap("spark.value1" -> "value1", "spark.value2" -> "value2")
-
- def getenv(key: String): String = env.getOrElse(key, null)
-
- def expand(value: String): String = ConfigEntry.expand(value, conf.asJava, getenv, Set())
-
- assert(expand("${spark.value1}") === "value1")
- assert(expand("spark.value1 is: ${spark.value1}") === "spark.value1 is: value1")
- assert(expand("${spark.value1} ${spark.value2}") === "value1 value2")
- assert(expand("${spark.value3}") === "${spark.value3}")
-
- // Make sure anything that is not in the "spark." namespace is ignored.
- conf("notspark.key") = "value"
- assert(expand("${notspark.key}") === "${notspark.key}")
-
- assert(expand("${env:ENV1}") === "env1")
- assert(expand("${system:user.name}") === sys.props("user.name"))
+ val conf = new SparkConfWithEnv(env)
val stringConf = ConfigBuilder(testKey("stringForExpansion"))
.stringConf
@@ -193,45 +178,44 @@ class ConfigEntrySuite extends SparkFunSuite {
val fallbackConf = ConfigBuilder(testKey("fallbackForExpansion"))
.fallbackConf(intConf)
- assert(expand("${" + stringConf.key + "}") === "string1")
- assert(expand("${" + optionalConf.key + "}") === "${" + optionalConf.key + "}")
- assert(expand("${" + intConf.key + "}") === "42")
- assert(expand("${" + fallbackConf.key + "}") === "42")
-
- conf(optionalConf.key) = "string2"
- assert(expand("${" + optionalConf.key + "}") === "string2")
+ val refConf = ConfigBuilder(testKey("configReferenceTest"))
+ .stringConf
+ .createWithDefault(null)
- conf(fallbackConf.key) = "84"
- assert(expand("${" + fallbackConf.key + "}") === "84")
+ def ref(entry: ConfigEntry[_]): String = "${" + entry.key + "}"
- assert(expand("${spark.value1") === "${spark.value1")
+ def testEntryRef(entry: ConfigEntry[_], expected: String): Unit = {
+ conf.set(refConf, ref(entry))
+ assert(conf.get(refConf) === expected)
+ }
- // Unknown prefixes.
- assert(expand("${unknown:value}") === "${unknown:value}")
+ testEntryRef(stringConf, "string1")
+ testEntryRef(intConf, "42")
+ testEntryRef(fallbackConf, "42")
- // Chained references.
- val conf1 = ConfigBuilder(testKey("conf1"))
- .stringConf
- .createWithDefault("value1")
- val conf2 = ConfigBuilder(testKey("conf2"))
- .stringConf
- .createWithDefault("value2")
+ testEntryRef(optionalConf, ref(optionalConf))
- conf(conf2.key) = "${" + conf1.key + "}"
- assert(expand("${" + conf2.key + "}") === conf1.defaultValueString)
+ conf.set(optionalConf, ref(stringConf))
+ testEntryRef(optionalConf, "string1")
- // Circular references.
- conf(conf1.key) = "${" + conf2.key + "}"
- val e = intercept[IllegalArgumentException] {
- expand("${" + conf2.key + "}")
- }
- assert(e.getMessage().contains("Circular"))
+ conf.set(optionalConf, ref(fallbackConf))
+ testEntryRef(optionalConf, "42")
// Default string values with variable references.
val parameterizedStringConf = ConfigBuilder(testKey("stringWithParams"))
.stringConf
- .createWithDefault("${spark.value1}")
- assert(parameterizedStringConf.readFrom(conf.asJava, getenv) === conf("spark.value1"))
+ .createWithDefault(ref(stringConf))
+ assert(conf.get(parameterizedStringConf) === conf.get(stringConf))
+
+ // Make sure SparkConf's env override works.
+ conf.set(refConf, "${env:ENV1}")
+ assert(conf.get(refConf) === env("ENV1"))
+
+ // Conf with null default value is not expanded.
+ val nullConf = ConfigBuilder(testKey("nullString"))
+ .stringConf
+ .createWithDefault(null)
+ testEntryRef(nullConf, ref(nullConf))
}
}
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala
new file mode 100644
index 0000000000..be57cc34e4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+
+class ConfigReaderSuite extends SparkFunSuite {
+
+ test("variable expansion") {
+ val env = Map("ENV1" -> "env1")
+ val conf = Map("key1" -> "value1", "key2" -> "value2")
+
+ val reader = new ConfigReader(conf.asJava)
+ reader.bindEnv(new MapProvider(env.asJava))
+
+ assert(reader.substitute(null) === null)
+ assert(reader.substitute("${key1}") === "value1")
+ assert(reader.substitute("key1 is: ${key1}") === "key1 is: value1")
+ assert(reader.substitute("${key1} ${key2}") === "value1 value2")
+ assert(reader.substitute("${key3}") === "${key3}")
+ assert(reader.substitute("${env:ENV1}") === "env1")
+ assert(reader.substitute("${system:user.name}") === sys.props("user.name"))
+ assert(reader.substitute("${key1") === "${key1")
+
+ // Unknown prefixes.
+ assert(reader.substitute("${unknown:value}") === "${unknown:value}")
+ }
+
+ test("circular references") {
+ val conf = Map("key1" -> "${key2}", "key2" -> "${key1}")
+ val reader = new ConfigReader(conf.asJava)
+ val e = intercept[IllegalArgumentException] {
+ reader.substitute("${key1}")
+ }
+ assert(e.getMessage().contains("Circular"))
+ }
+
+ test("spark conf provider filters config keys") {
+ val conf = Map("nonspark.key" -> "value", "spark.key" -> "value")
+ val reader = new ConfigReader(new SparkConfigProvider(conf.asJava))
+ assert(reader.get("nonspark.key") === None)
+ assert(reader.get("spark.key") === Some("value"))
+ }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b867a6551f..f2b1afd71a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -496,7 +496,8 @@ object SQLConf {
val VARIABLE_SUBSTITUTE_DEPTH =
SQLConfigBuilder("spark.sql.variable.substitute.depth")
- .doc("The maximum replacements the substitution engine will do.")
+ .internal()
+ .doc("Deprecated: The maximum replacements the substitution engine will do.")
.intConf
.createWithDefault(40)
@@ -565,6 +566,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
@transient protected[spark] val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())
+ @transient private val reader = new ConfigReader(settings)
+
/** ************************ Spark SQL Params/Hints ******************* */
def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
@@ -739,7 +742,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
*/
def getConf[T](entry: ConfigEntry[T]): T = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
- entry.readFrom(settings, System.getenv)
+ entry.readFrom(reader)
}
/**
@@ -748,7 +751,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
*/
def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
- entry.readFrom(settings, System.getenv)
+ entry.readFrom(reader)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
index 0982f1d687..50725a09c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.internal
import java.util.regex.Pattern
+import org.apache.spark.internal.config._
import org.apache.spark.sql.AnalysisException
/**
@@ -29,93 +30,24 @@ import org.apache.spark.sql.AnalysisException
*/
class VariableSubstitution(conf: SQLConf) {
- private val pattern = Pattern.compile("\\$\\{[^\\}\\$ ]+\\}")
+ private val provider = new ConfigProvider {
+ override def get(key: String): Option[String] = Option(conf.getConfString(key, ""))
+ }
+
+ private val reader = new ConfigReader(provider)
+ .bind("spark", provider)
+ .bind("sparkconf", provider)
+ .bind("hiveconf", provider)
/**
* Given a query, does variable substitution and return the result.
*/
def substitute(input: String): String = {
- // Note that this function is mostly copied from Hive's SystemVariables, so the style is
- // very Java/Hive like.
- if (input eq null) {
- return null
- }
-
- if (!conf.variableSubstituteEnabled) {
- return input
- }
-
- var eval = input
- val depth = conf.variableSubstituteDepth
- val builder = new StringBuilder
- val m = pattern.matcher("")
-
- var s = 0
- while (s <= depth) {
- m.reset(eval)
- builder.setLength(0)
-
- var prev = 0
- var found = false
- while (m.find(prev)) {
- val group = m.group()
- var substitute = substituteVariable(group.substring(2, group.length - 1))
- if (substitute.isEmpty) {
- substitute = group
- } else {
- found = true
- }
- builder.append(eval.substring(prev, m.start())).append(substitute)
- prev = m.end()
- }
-
- if (!found) {
- return eval
- }
-
- builder.append(eval.substring(prev))
- eval = builder.toString
- s += 1
- }
-
- if (s > depth) {
- throw new AnalysisException(
- "Variable substitution depth is deeper than " + depth + " for input " + input)
+ if (conf.variableSubstituteEnabled) {
+ reader.substitute(input)
} else {
- return eval
+ input
}
}
- /**
- * Given a variable, replaces with the substitute value (default to "").
- */
- private def substituteVariable(variable: String): String = {
- var value: String = null
-
- if (variable.startsWith("system:")) {
- value = System.getProperty(variable.substring("system:".length()))
- }
-
- if (value == null && variable.startsWith("env:")) {
- value = System.getenv(variable.substring("env:".length()))
- }
-
- if (value == null && conf != null && variable.startsWith("hiveconf:")) {
- value = conf.getConfString(variable.substring("hiveconf:".length()), "")
- }
-
- if (value == null && conf != null && variable.startsWith("sparkconf:")) {
- value = conf.getConfString(variable.substring("sparkconf:".length()), "")
- }
-
- if (value == null && conf != null && variable.startsWith("spark:")) {
- value = conf.getConfString(variable.substring("spark:".length()), "")
- }
-
- if (value == null && conf != null) {
- value = conf.getConfString(variable, "")
- }
-
- value
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
index deac95918b..d5a946aeaa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
@@ -57,22 +57,4 @@ class VariableSubstitutionSuite extends SparkFunSuite {
assert(sub.substitute(q) == "select 1 1 this is great")
}
- test("depth limit") {
- val q = "select ${bar} ${foo} ${doo}"
- conf.setConfString(SQLConf.VARIABLE_SUBSTITUTE_DEPTH.key, "2")
-
- // This should be OK since it is not nested.
- conf.setConfString("bar", "1")
- conf.setConfString("foo", "2")
- conf.setConfString("doo", "3")
- assert(sub.substitute(q) == "select 1 2 3")
-
- // This should not be OK since it is nested in 3 levels.
- conf.setConfString("bar", "1")
- conf.setConfString("foo", "${bar}")
- conf.setConfString("doo", "${foo}")
- intercept[AnalysisException] {
- sub.substitute(q)
- }
- }
}