diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2016-08-15 11:09:54 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-08-15 11:09:54 -0700 |
commit | 5da6c4b24f512b63cd4e6ba7dd8968066a9396f5 (patch) | |
tree | 7bf18c71a8c55eef6539bc484a73a2e20d97d4e1 /sql | |
parent | 564fe614c11deb657e0ac9e6b75e65370c48b7fe (diff) | |
download | spark-5da6c4b24f512b63cd4e6ba7dd8968066a9396f5.tar.gz spark-5da6c4b24f512b63cd4e6ba7dd8968066a9396f5.tar.bz2 spark-5da6c4b24f512b63cd4e6ba7dd8968066a9396f5.zip |
[SPARK-16671][CORE][SQL] Consolidate code to do variable substitution.
Both core and sql have slightly different code that does variable substitution
of config values. This change refactors that code and encapsulates the logic
of reading config values and expading variables in a new helper class, which
can be configured so that both core and sql can use it without losing existing
functionality, and allows for easier testing and makes it easier to add more
features in the future.
Tested with existing and new unit tests, and by running spark-shell with
some configs referencing variables and making sure it behaved as expected.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #14468 from vanzin/SPARK-16671.
Diffstat (limited to 'sql')
3 files changed, 18 insertions, 101 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b867a6551f..f2b1afd71a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -496,7 +496,8 @@ object SQLConf { val VARIABLE_SUBSTITUTE_DEPTH = SQLConfigBuilder("spark.sql.variable.substitute.depth") - .doc("The maximum replacements the substitution engine will do.") + .internal() + .doc("Deprecated: The maximum replacements the substitution engine will do.") .intConf .createWithDefault(40) @@ -565,6 +566,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { @transient protected[spark] val settings = java.util.Collections.synchronizedMap( new java.util.HashMap[String, String]()) + @transient private val reader = new ConfigReader(settings) + /** ************************ Spark SQL Params/Hints ******************* */ def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS) @@ -739,7 +742,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { */ def getConf[T](entry: ConfigEntry[T]): T = { require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") - entry.readFrom(settings, System.getenv) + entry.readFrom(reader) } /** @@ -748,7 +751,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { */ def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = { require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") - entry.readFrom(settings, System.getenv) + entry.readFrom(reader) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala index 0982f1d687..50725a09c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.internal import java.util.regex.Pattern +import org.apache.spark.internal.config._ import org.apache.spark.sql.AnalysisException /** @@ -29,93 +30,24 @@ import org.apache.spark.sql.AnalysisException */ class VariableSubstitution(conf: SQLConf) { - private val pattern = Pattern.compile("\\$\\{[^\\}\\$ ]+\\}") + private val provider = new ConfigProvider { + override def get(key: String): Option[String] = Option(conf.getConfString(key, "")) + } + + private val reader = new ConfigReader(provider) + .bind("spark", provider) + .bind("sparkconf", provider) + .bind("hiveconf", provider) /** * Given a query, does variable substitution and return the result. */ def substitute(input: String): String = { - // Note that this function is mostly copied from Hive's SystemVariables, so the style is - // very Java/Hive like. - if (input eq null) { - return null - } - - if (!conf.variableSubstituteEnabled) { - return input - } - - var eval = input - val depth = conf.variableSubstituteDepth - val builder = new StringBuilder - val m = pattern.matcher("") - - var s = 0 - while (s <= depth) { - m.reset(eval) - builder.setLength(0) - - var prev = 0 - var found = false - while (m.find(prev)) { - val group = m.group() - var substitute = substituteVariable(group.substring(2, group.length - 1)) - if (substitute.isEmpty) { - substitute = group - } else { - found = true - } - builder.append(eval.substring(prev, m.start())).append(substitute) - prev = m.end() - } - - if (!found) { - return eval - } - - builder.append(eval.substring(prev)) - eval = builder.toString - s += 1 - } - - if (s > depth) { - throw new AnalysisException( - "Variable substitution depth is deeper than " + depth + " for input " + input) + if (conf.variableSubstituteEnabled) { + reader.substitute(input) } else { - return eval + input } } - /** - * Given a variable, replaces with the substitute value (default to ""). - */ - private def substituteVariable(variable: String): String = { - var value: String = null - - if (variable.startsWith("system:")) { - value = System.getProperty(variable.substring("system:".length())) - } - - if (value == null && variable.startsWith("env:")) { - value = System.getenv(variable.substring("env:".length())) - } - - if (value == null && conf != null && variable.startsWith("hiveconf:")) { - value = conf.getConfString(variable.substring("hiveconf:".length()), "") - } - - if (value == null && conf != null && variable.startsWith("sparkconf:")) { - value = conf.getConfString(variable.substring("sparkconf:".length()), "") - } - - if (value == null && conf != null && variable.startsWith("spark:")) { - value = conf.getConfString(variable.substring("spark:".length()), "") - } - - if (value == null && conf != null) { - value = conf.getConfString(variable, "") - } - - value - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala index deac95918b..d5a946aeaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala @@ -57,22 +57,4 @@ class VariableSubstitutionSuite extends SparkFunSuite { assert(sub.substitute(q) == "select 1 1 this is great") } - test("depth limit") { - val q = "select ${bar} ${foo} ${doo}" - conf.setConfString(SQLConf.VARIABLE_SUBSTITUTE_DEPTH.key, "2") - - // This should be OK since it is not nested. - conf.setConfString("bar", "1") - conf.setConfString("foo", "2") - conf.setConfString("doo", "3") - assert(sub.substitute(q) == "select 1 2 3") - - // This should not be OK since it is nested in 3 levels. - conf.setConfString("bar", "1") - conf.setConfString("foo", "${bar}") - conf.setConfString("doo", "${foo}") - intercept[AnalysisException] { - sub.substitute(q) - } - } } |