diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-05-02 15:20:07 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-05-02 15:20:07 -0700 |
commit | 5d6b90d939d281130c786be38fd1794c74391b08 (patch) | |
tree | bb20ece873f0020943718c65ef99da8b6981efec /sql/hive | |
parent | 82c8c37c098e5886da65cea3108737744e270b91 (diff) | |
download | spark-5d6b90d939d281130c786be38fd1794c74391b08.tar.gz spark-5d6b90d939d281130c786be38fd1794c74391b08.tar.bz2 spark-5d6b90d939d281130c786be38fd1794c74391b08.zip |
[SPARK-5213] [SQL] Pluggable SQL Parser Support
based on #4015, we should not delete `sqlParser` from sqlcontext, that leads to mima failed. Users implement dialect to give a fallback for `sqlParser` and we should construct `sqlParser` in sqlcontext according to the dialect
`protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))`
Author: Cheng Hao <hao.cheng@intel.com>
Author: scwf <wangfei1@huawei.com>
Closes #5827 from scwf/sqlparser1 and squashes the following commits:
81b9737 [scwf] comment fix
0878bd1 [scwf] remove comments
c19780b [scwf] fix mima tests
c2895cf [scwf] Merge branch 'master' of https://github.com/apache/spark into sqlparser1
493775c [Cheng Hao] update the code as feedback
81a731f [Cheng Hao] remove the unecessary comment
aab0b0b [Cheng Hao] polish the code a little bit
49b9d81 [Cheng Hao] shrink the comment for rebasing
Diffstat (limited to 'sql/hive')
3 files changed, 66 insertions, 19 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index dd06b2620c..1d8d0b5c32 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.hive import java.io.{BufferedReader, InputStreamReader, PrintStream} import java.sql.Timestamp +import org.apache.hadoop.hive.ql.parse.VariableSubstitution +import org.apache.spark.sql.catalyst.Dialect + import scala.collection.JavaConversions._ import scala.language.implicitConversions @@ -43,6 +46,15 @@ import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy} import org.apache.spark.sql.types._ /** + * This is the HiveQL Dialect, this dialect is strongly bind with HiveContext + */ +private[hive] class HiveQLDialect extends Dialect { + override def parse(sqlText: String): LogicalPlan = { + HiveQl.parseSql(sqlText) + } +} + +/** * An instance of the Spark SQL execution engine that integrates with data stored in Hive. * Configuration for Hive is read from hive-site.xml on the classpath. */ @@ -81,25 +93,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[sql] def convertCTAS: Boolean = getConf("spark.sql.hive.convertCTAS", "false").toBoolean - override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = - new this.QueryExecution(plan) - @transient - protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_)) - - override def sql(sqlText: String): DataFrame = { - val substituted = new VariableSubstitution().substitute(hiveconf, sqlText) - // TODO: Create a framework for registering parsers instead of just hardcoding if statements. - if (conf.dialect == "sql") { - super.sql(substituted) - } else if (conf.dialect == "hiveql") { - val ddlPlan = ddlParserWithHiveQL.parse(sqlText, exceptionOnError = false) - DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted))) - } else { - sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'") - } + protected[sql] lazy val substitutor = new VariableSubstitution() + + protected[sql] override def parseSql(sql: String): LogicalPlan = { + super.parseSql(substitutor.substitute(hiveconf, sql)) } + override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = + new this.QueryExecution(plan) + /** * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, * Spark SQL or the external data source library it uses might cache certain metadata about a @@ -356,6 +359,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } + override protected[sql] def dialectClassName = if (conf.dialect == "hiveql") { + classOf[HiveQLDialect].getCanonicalName + } else { + super.dialectClassName + } + @transient private val hivePlanner = new SparkPlanner with HiveStrategies { val hiveContext = self diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 9f17bca083..edeab5158d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -107,7 +107,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { /** Fewer partitions to speed up testing. */ protected[sql] override lazy val conf: SQLConf = new SQLConf { override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt - override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") + + // TODO as in unit test, conf.clear() probably be called, all of the value will be cleared. + // The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql" + override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 4f8d0ac0e7..630dec8fa0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -18,14 +18,17 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries -import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim} +import org.apache.spark.sql.catalyst.errors.DialectException +import org.apache.spark.sql.DefaultDialect +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} +import org.apache.spark.sql.hive.MetastoreRelation import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim} import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) @@ -45,6 +48,9 @@ case class Order( state: String, month: Int) +/** A SQL Dialect for testing purpose, and it can not be nested type */ +class MyDialect extends DefaultDialect + /** * A collection of hive query tests where we generate the answers ourselves instead of depending on * Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is @@ -229,6 +235,35 @@ class SQLQuerySuite extends QueryTest { setConf("spark.sql.hive.convertCTAS", originalConf) } + test("SQL Dialect Switching") { + assert(getSQLDialect().getClass === classOf[HiveQLDialect]) + setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName()) + assert(getSQLDialect().getClass === classOf[MyDialect]) + assert(sql("SELECT 1").collect() === Array(Row(1))) + + // set the dialect back to the DefaultSQLDialect + sql("SET spark.sql.dialect=sql") + assert(getSQLDialect().getClass === classOf[DefaultDialect]) + sql("SET spark.sql.dialect=hiveql") + assert(getSQLDialect().getClass === classOf[HiveQLDialect]) + + // set invalid dialect + sql("SET spark.sql.dialect.abc=MyTestClass") + sql("SET spark.sql.dialect=abc") + intercept[Exception] { + sql("SELECT 1") + } + // test if the dialect set back to HiveQLDialect + getSQLDialect().getClass === classOf[HiveQLDialect] + + sql("SET spark.sql.dialect=MyTestClass") + intercept[DialectException] { + sql("SELECT 1") + } + // test if the dialect set back to HiveQLDialect + assert(getSQLDialect().getClass === classOf[HiveQLDialect]) + } + test("CTAS with serde") { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect() sql( |