diff options
9 files changed, 312 insertions, 228 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index b6d244b1a0..31b41d9524 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.LinkedHashSet import org.apache.avro.{Schema, SchemaNormalization} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} +import org.apache.spark.internal.config._ import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -56,6 +56,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria private val settings = new ConcurrentHashMap[String, String]() + private val reader = new ConfigReader(new SparkConfigProvider(settings)) + reader.bindEnv(new ConfigProvider { + override def get(key: String): Option[String] = Option(getenv(key)) + }) + if (loadDefaults) { loadFromSystemProperties(false) } @@ -248,7 +253,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria * - This will throw an exception is the config is not optional and the value is not set. */ private[spark] def get[T](entry: ConfigEntry[T]): T = { - entry.readFrom(settings, getenv) + entry.readFrom(reader) } /** diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala index e2e23b3c3c..113037d1ab 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala @@ -26,22 +26,9 @@ import org.apache.spark.SparkConf /** * An entry contains all meta information for a configuration. * - * Config options created using this feature support variable expansion. If the config value - * contains variable references of the form "${prefix:variableName}", the reference will be replaced - * with the value of the variable depending on the prefix. The prefix can be one of: - * - * - no prefix: if the config key starts with "spark", looks for the value in the Spark config - * - system: looks for the value in the system properties - * - env: looks for the value in the environment - * - * So referencing "${spark.master}" will look for the value of "spark.master" in the Spark - * configuration, while referencing "${env:MASTER}" will read the value from the "MASTER" - * environment variable. - * - * For known Spark configuration keys (i.e. those created using `ConfigBuilder`), references - * will also consider the default value when it exists. - * - * If the reference cannot be resolved, the original string will be retained. + * When applying variable substitution to config values, only references starting with "spark." are + * considered in the default namespace. For known Spark configuration keys (i.e. those created using + * `ConfigBuilder`), references will also consider the default value when it exists. * * Variable expansion is also applied to the default values of config entries that have a default * value declared as a string. @@ -72,7 +59,7 @@ private[spark] abstract class ConfigEntry[T] ( def defaultValueString: String - def readFrom(conf: JMap[String, String], getenv: String => String): T + def readFrom(reader: ConfigReader): T def defaultValue: Option[T] = None @@ -80,13 +67,6 @@ private[spark] abstract class ConfigEntry[T] ( s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)" } - protected def readAndExpand( - conf: JMap[String, String], - getenv: String => String, - usedRefs: Set[String] = Set()): Option[String] = { - Option(conf.get(key)).map(expand(_, conf, getenv, usedRefs)) - } - } private class ConfigEntryWithDefault[T] ( @@ -102,8 +82,8 @@ private class ConfigEntryWithDefault[T] ( override def defaultValueString: String = stringConverter(_defaultValue) - def readFrom(conf: JMap[String, String], getenv: String => String): T = { - readAndExpand(conf, getenv).map(valueConverter).getOrElse(_defaultValue) + def readFrom(reader: ConfigReader): T = { + reader.get(key).map(valueConverter).getOrElse(_defaultValue) } } @@ -121,12 +101,9 @@ private class ConfigEntryWithDefaultString[T] ( override def defaultValueString: String = _defaultValue - def readFrom(conf: JMap[String, String], getenv: String => String): T = { - Option(conf.get(key)) - .orElse(Some(_defaultValue)) - .map(ConfigEntry.expand(_, conf, getenv, Set())) - .map(valueConverter) - .get + def readFrom(reader: ConfigReader): T = { + val value = reader.get(key).getOrElse(reader.substitute(_defaultValue)) + valueConverter(value) } } @@ -146,8 +123,8 @@ private[spark] class OptionalConfigEntry[T]( override def defaultValueString: String = "<undefined>" - override def readFrom(conf: JMap[String, String], getenv: String => String): Option[T] = { - readAndExpand(conf, getenv).map(rawValueConverter) + override def readFrom(reader: ConfigReader): Option[T] = { + reader.get(key).map(rawValueConverter) } } @@ -164,18 +141,16 @@ private class FallbackConfigEntry[T] ( override def defaultValueString: String = s"<value of ${fallback.key}>" - override def readFrom(conf: JMap[String, String], getenv: String => String): T = { - Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv)) + override def readFrom(reader: ConfigReader): T = { + reader.get(key).map(valueConverter).getOrElse(fallback.readFrom(reader)) } } -private object ConfigEntry { +private[spark] object ConfigEntry { private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]() - private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r - def registerEntry(entry: ConfigEntry[_]): Unit = { val existing = knownConfigs.putIfAbsent(entry.key, entry) require(existing == null, s"Config entry ${entry.key} already registered!") @@ -183,43 +158,4 @@ private object ConfigEntry { def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key) - /** - * Expand the `value` according to the rules explained in ConfigEntry. - */ - def expand( - value: String, - conf: JMap[String, String], - getenv: String => String, - usedRefs: Set[String]): String = { - REF_RE.replaceAllIn(value, { m => - val prefix = m.group(1) - val name = m.group(2) - val replacement = prefix match { - case null => - require(!usedRefs.contains(name), s"Circular reference in $value: $name") - if (name.startsWith("spark.")) { - Option(findEntry(name)) - .flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name)) - .orElse(Option(conf.get(name))) - .orElse(defaultValueString(name)) - } else { - None - } - case "system" => sys.props.get(name) - case "env" => Option(getenv(name)) - case _ => None - } - Regex.quoteReplacement(replacement.getOrElse(m.matched)) - }) - } - - private def defaultValueString(key: String): Option[String] = { - findEntry(key) match { - case e: ConfigEntryWithDefault[_] => Some(e.defaultValueString) - case e: ConfigEntryWithDefaultString[_] => Some(e.defaultValueString) - case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key) - case _ => None - } - } - } diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala new file mode 100644 index 0000000000..4b546c847a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +import java.util.{Map => JMap} + +/** + * A source of configuration values. + */ +private[spark] trait ConfigProvider { + + def get(key: String): Option[String] + +} + +private[spark] class EnvProvider extends ConfigProvider { + + override def get(key: String): Option[String] = sys.env.get(key) + +} + +private[spark] class SystemProvider extends ConfigProvider { + + override def get(key: String): Option[String] = sys.props.get(key) + +} + +private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvider { + + override def get(key: String): Option[String] = Option(conf.get(key)) + +} + +/** + * A config provider that only reads Spark config keys, and considers default values for known + * configs when fetching configuration values. + */ +private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider { + + import ConfigEntry._ + + override def get(key: String): Option[String] = { + if (key.startsWith("spark.")) { + Option(conf.get(key)).orElse(defaultValueString(key)) + } else { + None + } + } + + private def defaultValueString(key: String): Option[String] = { + findEntry(key) match { + case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString) + case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString) + case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key) + case _ => None + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala new file mode 100644 index 0000000000..bb1a3bb5fc --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +import java.util.{Map => JMap} +import java.util.regex.Pattern + +import scala.collection.mutable.HashMap +import scala.util.matching.Regex + +private object ConfigReader { + + private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r + +} + +/** + * A helper class for reading config entries and performing variable substitution. + * + * If a config value contains variable references of the form "${prefix:variableName}", the + * reference will be replaced with the value of the variable depending on the prefix. By default, + * the following prefixes are handled: + * + * - no prefix: use the default config provider + * - system: looks for the value in the system properties + * - env: looks for the value in the environment + * + * Different prefixes can be bound to a `ConfigProvider`, which is used to read configuration + * values from the data source for the prefix, and both the system and env providers can be + * overridden. + * + * If the reference cannot be resolved, the original string will be retained. + * + * @param conf The config provider for the default namespace (no prefix). + */ +private[spark] class ConfigReader(conf: ConfigProvider) { + + def this(conf: JMap[String, String]) = this(new MapProvider(conf)) + + private val bindings = new HashMap[String, ConfigProvider]() + bind(null, conf) + bindEnv(new EnvProvider()) + bindSystem(new SystemProvider()) + + /** + * Binds a prefix to a provider. This method is not thread-safe and should be called + * before the instance is used to expand values. + */ + def bind(prefix: String, provider: ConfigProvider): ConfigReader = { + bindings(prefix) = provider + this + } + + def bind(prefix: String, values: JMap[String, String]): ConfigReader = { + bind(prefix, new MapProvider(values)) + } + + def bindEnv(provider: ConfigProvider): ConfigReader = bind("env", provider) + + def bindSystem(provider: ConfigProvider): ConfigReader = bind("system", provider) + + /** + * Reads a configuration key from the default provider, and apply variable substitution. + */ + def get(key: String): Option[String] = conf.get(key).map(substitute) + + /** + * Perform variable substitution on the given input string. + */ + def substitute(input: String): String = substitute(input, Set()) + + private def substitute(input: String, usedRefs: Set[String]): String = { + if (input != null) { + ConfigReader.REF_RE.replaceAllIn(input, { m => + val prefix = m.group(1) + val name = m.group(2) + val ref = if (prefix == null) name else s"$prefix:$name" + require(!usedRefs.contains(ref), s"Circular reference in $input: $ref") + + val replacement = bindings.get(prefix) + .flatMap(_.get(name)) + .map { v => substitute(v, usedRefs + ref) } + .getOrElse(m.matched) + Regex.quoteReplacement(replacement) + }) + } else { + input + } + } + +} diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index ebdb69f31e..91a96bdda6 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.HashMap import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.network.util.ByteUnit +import org.apache.spark.util.SparkConfWithEnv class ConfigEntrySuite extends SparkFunSuite { @@ -161,25 +162,9 @@ class ConfigEntrySuite extends SparkFunSuite { assert(conf.get(stringConf) === null) } - test("variable expansion") { + test("variable expansion of spark config entries") { val env = Map("ENV1" -> "env1") - val conf = HashMap("spark.value1" -> "value1", "spark.value2" -> "value2") - - def getenv(key: String): String = env.getOrElse(key, null) - - def expand(value: String): String = ConfigEntry.expand(value, conf.asJava, getenv, Set()) - - assert(expand("${spark.value1}") === "value1") - assert(expand("spark.value1 is: ${spark.value1}") === "spark.value1 is: value1") - assert(expand("${spark.value1} ${spark.value2}") === "value1 value2") - assert(expand("${spark.value3}") === "${spark.value3}") - - // Make sure anything that is not in the "spark." namespace is ignored. - conf("notspark.key") = "value" - assert(expand("${notspark.key}") === "${notspark.key}") - - assert(expand("${env:ENV1}") === "env1") - assert(expand("${system:user.name}") === sys.props("user.name")) + val conf = new SparkConfWithEnv(env) val stringConf = ConfigBuilder(testKey("stringForExpansion")) .stringConf @@ -193,45 +178,44 @@ class ConfigEntrySuite extends SparkFunSuite { val fallbackConf = ConfigBuilder(testKey("fallbackForExpansion")) .fallbackConf(intConf) - assert(expand("${" + stringConf.key + "}") === "string1") - assert(expand("${" + optionalConf.key + "}") === "${" + optionalConf.key + "}") - assert(expand("${" + intConf.key + "}") === "42") - assert(expand("${" + fallbackConf.key + "}") === "42") - - conf(optionalConf.key) = "string2" - assert(expand("${" + optionalConf.key + "}") === "string2") + val refConf = ConfigBuilder(testKey("configReferenceTest")) + .stringConf + .createWithDefault(null) - conf(fallbackConf.key) = "84" - assert(expand("${" + fallbackConf.key + "}") === "84") + def ref(entry: ConfigEntry[_]): String = "${" + entry.key + "}" - assert(expand("${spark.value1") === "${spark.value1") + def testEntryRef(entry: ConfigEntry[_], expected: String): Unit = { + conf.set(refConf, ref(entry)) + assert(conf.get(refConf) === expected) + } - // Unknown prefixes. - assert(expand("${unknown:value}") === "${unknown:value}") + testEntryRef(stringConf, "string1") + testEntryRef(intConf, "42") + testEntryRef(fallbackConf, "42") - // Chained references. - val conf1 = ConfigBuilder(testKey("conf1")) - .stringConf - .createWithDefault("value1") - val conf2 = ConfigBuilder(testKey("conf2")) - .stringConf - .createWithDefault("value2") + testEntryRef(optionalConf, ref(optionalConf)) - conf(conf2.key) = "${" + conf1.key + "}" - assert(expand("${" + conf2.key + "}") === conf1.defaultValueString) + conf.set(optionalConf, ref(stringConf)) + testEntryRef(optionalConf, "string1") - // Circular references. - conf(conf1.key) = "${" + conf2.key + "}" - val e = intercept[IllegalArgumentException] { - expand("${" + conf2.key + "}") - } - assert(e.getMessage().contains("Circular")) + conf.set(optionalConf, ref(fallbackConf)) + testEntryRef(optionalConf, "42") // Default string values with variable references. val parameterizedStringConf = ConfigBuilder(testKey("stringWithParams")) .stringConf - .createWithDefault("${spark.value1}") - assert(parameterizedStringConf.readFrom(conf.asJava, getenv) === conf("spark.value1")) + .createWithDefault(ref(stringConf)) + assert(conf.get(parameterizedStringConf) === conf.get(stringConf)) + + // Make sure SparkConf's env override works. + conf.set(refConf, "${env:ENV1}") + assert(conf.get(refConf) === env("ENV1")) + + // Conf with null default value is not expanded. + val nullConf = ConfigBuilder(testKey("nullString")) + .stringConf + .createWithDefault(null) + testEntryRef(nullConf, ref(nullConf)) } } diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala new file mode 100644 index 0000000000..be57cc34e4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite + +class ConfigReaderSuite extends SparkFunSuite { + + test("variable expansion") { + val env = Map("ENV1" -> "env1") + val conf = Map("key1" -> "value1", "key2" -> "value2") + + val reader = new ConfigReader(conf.asJava) + reader.bindEnv(new MapProvider(env.asJava)) + + assert(reader.substitute(null) === null) + assert(reader.substitute("${key1}") === "value1") + assert(reader.substitute("key1 is: ${key1}") === "key1 is: value1") + assert(reader.substitute("${key1} ${key2}") === "value1 value2") + assert(reader.substitute("${key3}") === "${key3}") + assert(reader.substitute("${env:ENV1}") === "env1") + assert(reader.substitute("${system:user.name}") === sys.props("user.name")) + assert(reader.substitute("${key1") === "${key1") + + // Unknown prefixes. + assert(reader.substitute("${unknown:value}") === "${unknown:value}") + } + + test("circular references") { + val conf = Map("key1" -> "${key2}", "key2" -> "${key1}") + val reader = new ConfigReader(conf.asJava) + val e = intercept[IllegalArgumentException] { + reader.substitute("${key1}") + } + assert(e.getMessage().contains("Circular")) + } + + test("spark conf provider filters config keys") { + val conf = Map("nonspark.key" -> "value", "spark.key" -> "value") + val reader = new ConfigReader(new SparkConfigProvider(conf.asJava)) + assert(reader.get("nonspark.key") === None) + assert(reader.get("spark.key") === Some("value")) + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b867a6551f..f2b1afd71a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -496,7 +496,8 @@ object SQLConf { val VARIABLE_SUBSTITUTE_DEPTH = SQLConfigBuilder("spark.sql.variable.substitute.depth") - .doc("The maximum replacements the substitution engine will do.") + .internal() + .doc("Deprecated: The maximum replacements the substitution engine will do.") .intConf .createWithDefault(40) @@ -565,6 +566,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { @transient protected[spark] val settings = java.util.Collections.synchronizedMap( new java.util.HashMap[String, String]()) + @transient private val reader = new ConfigReader(settings) + /** ************************ Spark SQL Params/Hints ******************* */ def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS) @@ -739,7 +742,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { */ def getConf[T](entry: ConfigEntry[T]): T = { require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") - entry.readFrom(settings, System.getenv) + entry.readFrom(reader) } /** @@ -748,7 +751,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { */ def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = { require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") - entry.readFrom(settings, System.getenv) + entry.readFrom(reader) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala index 0982f1d687..50725a09c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.internal import java.util.regex.Pattern +import org.apache.spark.internal.config._ import org.apache.spark.sql.AnalysisException /** @@ -29,93 +30,24 @@ import org.apache.spark.sql.AnalysisException */ class VariableSubstitution(conf: SQLConf) { - private val pattern = Pattern.compile("\\$\\{[^\\}\\$ ]+\\}") + private val provider = new ConfigProvider { + override def get(key: String): Option[String] = Option(conf.getConfString(key, "")) + } + + private val reader = new ConfigReader(provider) + .bind("spark", provider) + .bind("sparkconf", provider) + .bind("hiveconf", provider) /** * Given a query, does variable substitution and return the result. */ def substitute(input: String): String = { - // Note that this function is mostly copied from Hive's SystemVariables, so the style is - // very Java/Hive like. - if (input eq null) { - return null - } - - if (!conf.variableSubstituteEnabled) { - return input - } - - var eval = input - val depth = conf.variableSubstituteDepth - val builder = new StringBuilder - val m = pattern.matcher("") - - var s = 0 - while (s <= depth) { - m.reset(eval) - builder.setLength(0) - - var prev = 0 - var found = false - while (m.find(prev)) { - val group = m.group() - var substitute = substituteVariable(group.substring(2, group.length - 1)) - if (substitute.isEmpty) { - substitute = group - } else { - found = true - } - builder.append(eval.substring(prev, m.start())).append(substitute) - prev = m.end() - } - - if (!found) { - return eval - } - - builder.append(eval.substring(prev)) - eval = builder.toString - s += 1 - } - - if (s > depth) { - throw new AnalysisException( - "Variable substitution depth is deeper than " + depth + " for input " + input) + if (conf.variableSubstituteEnabled) { + reader.substitute(input) } else { - return eval + input } } - /** - * Given a variable, replaces with the substitute value (default to ""). - */ - private def substituteVariable(variable: String): String = { - var value: String = null - - if (variable.startsWith("system:")) { - value = System.getProperty(variable.substring("system:".length())) - } - - if (value == null && variable.startsWith("env:")) { - value = System.getenv(variable.substring("env:".length())) - } - - if (value == null && conf != null && variable.startsWith("hiveconf:")) { - value = conf.getConfString(variable.substring("hiveconf:".length()), "") - } - - if (value == null && conf != null && variable.startsWith("sparkconf:")) { - value = conf.getConfString(variable.substring("sparkconf:".length()), "") - } - - if (value == null && conf != null && variable.startsWith("spark:")) { - value = conf.getConfString(variable.substring("spark:".length()), "") - } - - if (value == null && conf != null) { - value = conf.getConfString(variable, "") - } - - value - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala index deac95918b..d5a946aeaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala @@ -57,22 +57,4 @@ class VariableSubstitutionSuite extends SparkFunSuite { assert(sub.substitute(q) == "select 1 1 this is great") } - test("depth limit") { - val q = "select ${bar} ${foo} ${doo}" - conf.setConfString(SQLConf.VARIABLE_SUBSTITUTE_DEPTH.key, "2") - - // This should be OK since it is not nested. - conf.setConfString("bar", "1") - conf.setConfString("foo", "2") - conf.setConfString("doo", "3") - assert(sub.substitute(q) == "select 1 2 3") - - // This should not be OK since it is nested in 3 levels. - conf.setConfString("bar", "1") - conf.setConfString("foo", "${bar}") - conf.setConfString("doo", "${foo}") - intercept[AnalysisException] { - sub.substitute(q) - } - } } |