aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-01-21 17:34:18 -0800
committerReynold Xin <rxin@databricks.com>2015-01-21 17:34:18 -0800
commit27bccc5ea9742ca166f6026033e7771c8a90cc0a (patch)
tree2cc2248ae9f949cef5b85332a3e416c7e8aa0ef7
parent9bad062268676aaa66dcbddd1e0ab7f2d7742425 (diff)
downloadspark-27bccc5ea9742ca166f6026033e7771c8a90cc0a.tar.gz
spark-27bccc5ea9742ca166f6026033e7771c8a90cc0a.tar.bz2
spark-27bccc5ea9742ca166f6026033e7771c8a90cc0a.zip
[SPARK-5202] [SQL] Add hql variable substitution support
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+VariableSubstitution This is a block issue for the CLI user, it impacts the existed hql scripts from Hive. Author: Cheng Hao <hao.cheng@intel.com> Closes #4003 from chenghao-intel/substitution and squashes the following commits: bb41fd6 [Cheng Hao] revert the removed the implicit conversion af7c31a [Cheng Hao] add hql variable substitution support
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala18
2 files changed, 22 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 274f83af5a..9d2cfd8e0d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.processors._
+import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
@@ -66,11 +67,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
new this.QueryExecution { val logical = plan }
override def sql(sqlText: String): SchemaRDD = {
+ val substituted = new VariableSubstitution().substitute(hiveconf, sqlText)
// TODO: Create a framework for registering parsers instead of just hardcoding if statements.
if (conf.dialect == "sql") {
- super.sql(sqlText)
+ super.sql(substituted)
} else if (conf.dialect == "hiveql") {
- new SchemaRDD(this, ddlParser(sqlText, false).getOrElse(HiveQl.parseSql(sqlText)))
+ new SchemaRDD(this, ddlParser(sqlText, false).getOrElse(HiveQl.parseSql(substituted)))
} else {
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f6bf2dbb5d..7f9f1ac7cd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -104,6 +104,24 @@ class SQLQuerySuite extends QueryTest {
)
}
+ test("command substitution") {
+ sql("set tbl=src")
+ checkAnswer(
+ sql("SELECT key FROM ${hiveconf:tbl} ORDER BY key, value limit 1"),
+ sql("SELECT key FROM src ORDER BY key, value limit 1").collect().toSeq)
+
+ sql("set hive.variable.substitute=false") // disable the substitution
+ sql("set tbl2=src")
+ intercept[Exception] {
+ sql("SELECT key FROM ${hiveconf:tbl2} ORDER BY key, value limit 1").collect()
+ }
+
+ sql("set hive.variable.substitute=true") // enable the substitution
+ checkAnswer(
+ sql("SELECT key FROM ${hiveconf:tbl2} ORDER BY key, value limit 1"),
+ sql("SELECT key FROM src ORDER BY key, value limit 1").collect().toSeq)
+ }
+
test("ordering not in select") {
checkAnswer(
sql("SELECT key FROM src ORDER BY value"),