aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-08-15 11:09:54 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-15 11:09:54 -0700
commit5da6c4b24f512b63cd4e6ba7dd8968066a9396f5 (patch)
tree7bf18c71a8c55eef6539bc484a73a2e20d97d4e1 /sql/core
parent564fe614c11deb657e0ac9e6b75e65370c48b7fe (diff)
downloadspark-5da6c4b24f512b63cd4e6ba7dd8968066a9396f5.tar.gz
spark-5da6c4b24f512b63cd4e6ba7dd8968066a9396f5.tar.bz2
spark-5da6c4b24f512b63cd4e6ba7dd8968066a9396f5.zip
[SPARK-16671][CORE][SQL] Consolidate code to do variable substitution.
Both core and sql have slightly different code that does variable substitution of config values. This change refactors that code and encapsulates the logic of reading config values and expading variables in a new helper class, which can be configured so that both core and sql can use it without losing existing functionality, and allows for easier testing and makes it easier to add more features in the future. Tested with existing and new unit tests, and by running spark-shell with some configs referencing variables and making sure it behaved as expected. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #14468 from vanzin/SPARK-16671.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala92
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala18
3 files changed, 18 insertions, 101 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b867a6551f..f2b1afd71a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -496,7 +496,8 @@ object SQLConf {
val VARIABLE_SUBSTITUTE_DEPTH =
SQLConfigBuilder("spark.sql.variable.substitute.depth")
- .doc("The maximum replacements the substitution engine will do.")
+ .internal()
+ .doc("Deprecated: The maximum replacements the substitution engine will do.")
.intConf
.createWithDefault(40)
@@ -565,6 +566,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
@transient protected[spark] val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())
+ @transient private val reader = new ConfigReader(settings)
+
/** ************************ Spark SQL Params/Hints ******************* */
def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
@@ -739,7 +742,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
*/
def getConf[T](entry: ConfigEntry[T]): T = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
- entry.readFrom(settings, System.getenv)
+ entry.readFrom(reader)
}
/**
@@ -748,7 +751,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
*/
def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
- entry.readFrom(settings, System.getenv)
+ entry.readFrom(reader)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
index 0982f1d687..50725a09c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.internal
import java.util.regex.Pattern
+import org.apache.spark.internal.config._
import org.apache.spark.sql.AnalysisException
/**
@@ -29,93 +30,24 @@ import org.apache.spark.sql.AnalysisException
*/
class VariableSubstitution(conf: SQLConf) {
- private val pattern = Pattern.compile("\\$\\{[^\\}\\$ ]+\\}")
+ private val provider = new ConfigProvider {
+ override def get(key: String): Option[String] = Option(conf.getConfString(key, ""))
+ }
+
+ private val reader = new ConfigReader(provider)
+ .bind("spark", provider)
+ .bind("sparkconf", provider)
+ .bind("hiveconf", provider)
/**
* Given a query, does variable substitution and return the result.
*/
def substitute(input: String): String = {
- // Note that this function is mostly copied from Hive's SystemVariables, so the style is
- // very Java/Hive like.
- if (input eq null) {
- return null
- }
-
- if (!conf.variableSubstituteEnabled) {
- return input
- }
-
- var eval = input
- val depth = conf.variableSubstituteDepth
- val builder = new StringBuilder
- val m = pattern.matcher("")
-
- var s = 0
- while (s <= depth) {
- m.reset(eval)
- builder.setLength(0)
-
- var prev = 0
- var found = false
- while (m.find(prev)) {
- val group = m.group()
- var substitute = substituteVariable(group.substring(2, group.length - 1))
- if (substitute.isEmpty) {
- substitute = group
- } else {
- found = true
- }
- builder.append(eval.substring(prev, m.start())).append(substitute)
- prev = m.end()
- }
-
- if (!found) {
- return eval
- }
-
- builder.append(eval.substring(prev))
- eval = builder.toString
- s += 1
- }
-
- if (s > depth) {
- throw new AnalysisException(
- "Variable substitution depth is deeper than " + depth + " for input " + input)
+ if (conf.variableSubstituteEnabled) {
+ reader.substitute(input)
} else {
- return eval
+ input
}
}
- /**
- * Given a variable, replaces with the substitute value (default to "").
- */
- private def substituteVariable(variable: String): String = {
- var value: String = null
-
- if (variable.startsWith("system:")) {
- value = System.getProperty(variable.substring("system:".length()))
- }
-
- if (value == null && variable.startsWith("env:")) {
- value = System.getenv(variable.substring("env:".length()))
- }
-
- if (value == null && conf != null && variable.startsWith("hiveconf:")) {
- value = conf.getConfString(variable.substring("hiveconf:".length()), "")
- }
-
- if (value == null && conf != null && variable.startsWith("sparkconf:")) {
- value = conf.getConfString(variable.substring("sparkconf:".length()), "")
- }
-
- if (value == null && conf != null && variable.startsWith("spark:")) {
- value = conf.getConfString(variable.substring("spark:".length()), "")
- }
-
- if (value == null && conf != null) {
- value = conf.getConfString(variable, "")
- }
-
- value
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
index deac95918b..d5a946aeaa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
@@ -57,22 +57,4 @@ class VariableSubstitutionSuite extends SparkFunSuite {
assert(sub.substitute(q) == "select 1 1 this is great")
}
- test("depth limit") {
- val q = "select ${bar} ${foo} ${doo}"
- conf.setConfString(SQLConf.VARIABLE_SUBSTITUTE_DEPTH.key, "2")
-
- // This should be OK since it is not nested.
- conf.setConfString("bar", "1")
- conf.setConfString("foo", "2")
- conf.setConfString("doo", "3")
- assert(sub.substitute(q) == "select 1 2 3")
-
- // This should not be OK since it is nested in 3 levels.
- conf.setConfString("bar", "1")
- conf.setConfString("foo", "${bar}")
- conf.setConfString("doo", "${foo}")
- intercept[AnalysisException] {
- sub.substitute(q)
- }
- }
}