aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-05-02 15:20:07 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-02 15:20:07 -0700
commit5d6b90d939d281130c786be38fd1794c74391b08 (patch)
treebb20ece873f0020943718c65ef99da8b6981efec /sql/hive
parent82c8c37c098e5886da65cea3108737744e270b91 (diff)
downloadspark-5d6b90d939d281130c786be38fd1794c74391b08.tar.gz
spark-5d6b90d939d281130c786be38fd1794c74391b08.tar.bz2
spark-5d6b90d939d281130c786be38fd1794c74391b08.zip
[SPARK-5213] [SQL] Pluggable SQL Parser Support
based on #4015, we should not delete `sqlParser` from sqlcontext, that leads to mima failed. Users implement dialect to give a fallback for `sqlParser` and we should construct `sqlParser` in sqlcontext according to the dialect `protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))` Author: Cheng Hao <hao.cheng@intel.com> Author: scwf <wangfei1@huawei.com> Closes #5827 from scwf/sqlparser1 and squashes the following commits: 81b9737 [scwf] comment fix 0878bd1 [scwf] remove comments c19780b [scwf] fix mima tests c2895cf [scwf] Merge branch 'master' of https://github.com/apache/spark into sqlparser1 493775c [Cheng Hao] update the code as feedback 81a731f [Cheng Hao] remove the unecessary comment aab0b0b [Cheng Hao] polish the code a little bit 49b9d81 [Cheng Hao] shrink the comment for rebasing
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala41
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala39
3 files changed, 66 insertions, 19 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index dd06b2620c..1d8d0b5c32 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.hive
import java.io.{BufferedReader, InputStreamReader, PrintStream}
import java.sql.Timestamp
+import org.apache.hadoop.hive.ql.parse.VariableSubstitution
+import org.apache.spark.sql.catalyst.Dialect
+
import scala.collection.JavaConversions._
import scala.language.implicitConversions
@@ -43,6 +46,15 @@ import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
import org.apache.spark.sql.types._
/**
+ * This is the HiveQL Dialect, this dialect is strongly bind with HiveContext
+ */
+private[hive] class HiveQLDialect extends Dialect {
+ override def parse(sqlText: String): LogicalPlan = {
+ HiveQl.parseSql(sqlText)
+ }
+}
+
+/**
* An instance of the Spark SQL execution engine that integrates with data stored in Hive.
* Configuration for Hive is read from hive-site.xml on the classpath.
*/
@@ -81,25 +93,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[sql] def convertCTAS: Boolean =
getConf("spark.sql.hive.convertCTAS", "false").toBoolean
- override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
- new this.QueryExecution(plan)
-
@transient
- protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_))
-
- override def sql(sqlText: String): DataFrame = {
- val substituted = new VariableSubstitution().substitute(hiveconf, sqlText)
- // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
- if (conf.dialect == "sql") {
- super.sql(substituted)
- } else if (conf.dialect == "hiveql") {
- val ddlPlan = ddlParserWithHiveQL.parse(sqlText, exceptionOnError = false)
- DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted)))
- } else {
- sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
- }
+ protected[sql] lazy val substitutor = new VariableSubstitution()
+
+ protected[sql] override def parseSql(sql: String): LogicalPlan = {
+ super.parseSql(substitutor.substitute(hiveconf, sql))
}
+ override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
+ new this.QueryExecution(plan)
+
/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
@@ -356,6 +359,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
}
+ override protected[sql] def dialectClassName = if (conf.dialect == "hiveql") {
+ classOf[HiveQLDialect].getCanonicalName
+ } else {
+ super.dialectClassName
+ }
+
@transient
private val hivePlanner = new SparkPlanner with HiveStrategies {
val hiveContext = self
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9f17bca083..edeab5158d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -107,7 +107,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
/** Fewer partitions to speed up testing. */
protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
- override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+
+ // TODO as in unit test, conf.clear() probably be called, all of the value will be cleared.
+ // The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
+ override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 4f8d0ac0e7..630dec8fa0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -18,14 +18,17 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
-import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
+import org.apache.spark.sql.catalyst.errors.DialectException
+import org.apache.spark.sql.DefaultDialect
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
+import org.apache.spark.sql.hive.MetastoreRelation
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim}
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
case class Nested1(f1: Nested2)
case class Nested2(f2: Nested3)
@@ -45,6 +48,9 @@ case class Order(
state: String,
month: Int)
+/** A SQL Dialect for testing purpose, and it can not be nested type */
+class MyDialect extends DefaultDialect
+
/**
* A collection of hive query tests where we generate the answers ourselves instead of depending on
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
@@ -229,6 +235,35 @@ class SQLQuerySuite extends QueryTest {
setConf("spark.sql.hive.convertCTAS", originalConf)
}
+ test("SQL Dialect Switching") {
+ assert(getSQLDialect().getClass === classOf[HiveQLDialect])
+ setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
+ assert(getSQLDialect().getClass === classOf[MyDialect])
+ assert(sql("SELECT 1").collect() === Array(Row(1)))
+
+ // set the dialect back to the DefaultSQLDialect
+ sql("SET spark.sql.dialect=sql")
+ assert(getSQLDialect().getClass === classOf[DefaultDialect])
+ sql("SET spark.sql.dialect=hiveql")
+ assert(getSQLDialect().getClass === classOf[HiveQLDialect])
+
+ // set invalid dialect
+ sql("SET spark.sql.dialect.abc=MyTestClass")
+ sql("SET spark.sql.dialect=abc")
+ intercept[Exception] {
+ sql("SELECT 1")
+ }
+ // test if the dialect set back to HiveQLDialect
+ getSQLDialect().getClass === classOf[HiveQLDialect]
+
+ sql("SET spark.sql.dialect=MyTestClass")
+ intercept[DialectException] {
+ sql("SELECT 1")
+ }
+ // test if the dialect set back to HiveQLDialect
+ assert(getSQLDialect().getClass === classOf[HiveQLDialect])
+ }
+
test("CTAS with serde") {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
sql(