From 5d6b90d939d281130c786be38fd1794c74391b08 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Sat, 2 May 2015 15:20:07 -0700 Subject: [SPARK-5213] [SQL] Pluggable SQL Parser Support based on #4015, we should not delete `sqlParser` from sqlcontext, that leads to mima failed. Users implement dialect to give a fallback for `sqlParser` and we should construct `sqlParser` in sqlcontext according to the dialect `protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))` Author: Cheng Hao Author: scwf Closes #5827 from scwf/sqlparser1 and squashes the following commits: 81b9737 [scwf] comment fix 0878bd1 [scwf] remove comments c19780b [scwf] fix mima tests c2895cf [scwf] Merge branch 'master' of https://github.com/apache/spark into sqlparser1 493775c [Cheng Hao] update the code as feedback 81a731f [Cheng Hao] remove the unecessary comment aab0b0b [Cheng Hao] polish the code a little bit 49b9d81 [Cheng Hao] shrink the comment for rebasing --- .../sql/catalyst/AbstractSparkSQLParser.scala | 11 ++-- .../org/apache/spark/sql/catalyst/Dialect.scala | 33 ++++++++++ .../apache/spark/sql/catalyst/errors/package.scala | 2 + .../scala/org/apache/spark/sql/SQLContext.scala | 76 ++++++++++++++++++---- .../scala/org/apache/spark/sql/sources/ddl.scala | 6 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 22 +++++++ .../org/apache/spark/sql/hive/HiveContext.scala | 41 +++++++----- .../org/apache/spark/sql/hive/test/TestHive.scala | 5 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 39 ++++++++++- 9 files changed, 196 insertions(+), 39 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala index 1f3c02478b..2eb3e167ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala @@ -25,10 +25,6 @@ import scala.util.parsing.input.CharArrayReader.EofCh import org.apache.spark.sql.catalyst.plans.logical._ -private[sql] object KeywordNormalizer { - def apply(str: String): String = str.toLowerCase() -} - private[sql] abstract class AbstractSparkSQLParser extends StandardTokenParsers with PackratParsers { @@ -42,7 +38,7 @@ private[sql] abstract class AbstractSparkSQLParser } protected case class Keyword(str: String) { - def normalize: String = KeywordNormalizer(str) + def normalize: String = lexical.normalizeKeyword(str) def parser: Parser[String] = normalize } @@ -90,13 +86,16 @@ class SqlLexical extends StdLexical { reserved ++= keywords } + /* Normal the keyword string */ + def normalizeKeyword(str: String): String = str.toLowerCase + delimiters += ( "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")", ",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>" ) protected override def processIdent(name: String) = { - val token = KeywordNormalizer(name) + val token = normalizeKeyword(name) if (reserved contains token) Keyword(token) else Identifier(name) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala new file mode 100644 index 0000000000..977003493d --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * Root class of SQL Parser Dialect, and we don't guarantee the binary + * compatibility for the future release, let's keep it as the internal + * interface for advanced user. + * + */ +@DeveloperApi +abstract class Dialect { + // this is the main function that will be implemented by sql parser. + def parse(sqlText: String): LogicalPlan +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala index bdeb660b1e..0fd4f9b374 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala @@ -38,6 +38,8 @@ package object errors { } } + class DialectException(msg: String, cause: Throwable) extends Exception(msg, cause) + /** * Wraps any exceptions that are thrown while executing `f` in a * [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5116fcefd4..7eabb93c1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConversions._ import scala.collection.immutable import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag +import scala.util.control.NonFatal import com.google.common.reflect.TypeToken @@ -32,9 +33,11 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.Dialect import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions} import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} @@ -44,6 +47,42 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils import org.apache.spark.{Partition, SparkContext} +/** + * Currently we support the default dialect named "sql", associated with the class + * [[DefaultDialect]] + * + * And we can also provide custom SQL Dialect, for example in Spark SQL CLI: + * {{{ + *-- switch to "hiveql" dialect + * spark-sql>SET spark.sql.dialect=hiveql; + * spark-sql>SELECT * FROM src LIMIT 1; + * + *-- switch to "sql" dialect + * spark-sql>SET spark.sql.dialect=sql; + * spark-sql>SELECT * FROM src LIMIT 1; + * + *-- register the new SQL dialect + * spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect; + * spark-sql> SELECT * FROM src LIMIT 1; + * + *-- register the non-exist SQL dialect + * spark-sql> SET spark.sql.dialect=NotExistedClass; + * spark-sql> SELECT * FROM src LIMIT 1; + * + *-- Exception will be thrown and switch to dialect + *-- "sql" (for SQLContext) or + *-- "hiveql" (for HiveContext) + * }}} + */ +private[spark] class DefaultDialect extends Dialect { + @transient + protected val sqlParser = new catalyst.SqlParser + + override def parse(sqlText: String): LogicalPlan = { + sqlParser.parse(sqlText) + } +} + /** * The entry point for working with structured data (rows and columns) in Spark. Allows the * creation of [[DataFrame]] objects as well as the execution of SQL queries. @@ -135,14 +174,27 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) @transient - protected[sql] val sqlParser = { - val fallback = new catalyst.SqlParser - new SparkSQLParser(fallback.parse(_)) + protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_)) + + protected[sql] def getSQLDialect(): Dialect = { + try { + val clazz = Utils.classForName(dialectClassName) + clazz.newInstance().asInstanceOf[Dialect] + } catch { + case NonFatal(e) => + // Since we didn't find the available SQL Dialect, it will fail even for SET command: + // SET spark.sql.dialect=sql; Let's reset as default dialect automatically. + val dialect = conf.dialect + // reset the sql dialect + conf.unsetConf(SQLConf.DIALECT) + // throw out the exception, and the default sql dialect will take effect for next query. + throw new DialectException( + s"""Instantiating dialect '$dialect' failed. + |Reverting to default dialect '${conf.dialect}'""".stripMargin, e) + } } - protected[sql] def parseSql(sql: String): LogicalPlan = { - ddlParser.parse(sql, false).getOrElse(sqlParser.parse(sql)) - } + protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql)) @@ -156,6 +208,12 @@ class SQLContext(@transient val sparkContext: SparkContext) @transient protected[sql] val defaultSession = createSession() + protected[sql] def dialectClassName = if (conf.dialect == "sql") { + classOf[DefaultDialect].getCanonicalName + } else { + conf.dialect + } + sparkContext.getConf.getAll.foreach { case (key, value) if key.startsWith("spark.sql") => setConf(key, value) case _ => @@ -931,11 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group basic */ def sql(sqlText: String): DataFrame = { - if (conf.dialect == "sql") { - DataFrame(this, parseSql(sqlText)) - } else { - sys.error(s"Unsupported SQL dialect: ${conf.dialect}") - } + DataFrame(this, parseSql(sqlText)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index e7a0685e01..1abf3aa51c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -38,12 +38,12 @@ private[sql] class DDLParser( parseQuery: String => LogicalPlan) extends AbstractSparkSQLParser with DataTypeParser with Logging { - def parse(input: String, exceptionOnError: Boolean): Option[LogicalPlan] = { + def parse(input: String, exceptionOnError: Boolean): LogicalPlan = { try { - Some(parse(input)) + parse(input) } catch { case ddlException: DDLException => throw ddlException - case _ if !exceptionOnError => None + case _ if !exceptionOnError => parseQuery(input) case x: Throwable => throw x } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index d8e7cdbd3a..0ab8558c1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -19,13 +19,18 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.execution.GeneratedAggregate import org.apache.spark.sql.functions._ import org.apache.spark.sql.TestData._ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext.{udf => _, _} + import org.apache.spark.sql.types._ +/** A SQL Dialect for testing purpose, and it can not be nested type */ +class MyDialect extends DefaultDialect + class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { // Make sure the tables are loaded. TestData @@ -74,6 +79,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil) } + test("SQL Dialect Switching to a new SQL parser") { + val newContext = new SQLContext(TestSQLContext.sparkContext) + newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName()) + assert(newContext.getSQLDialect().getClass === classOf[MyDialect]) + assert(newContext.sql("SELECT 1").collect() === Array(Row(1))) + } + + test("SQL Dialect Switch to an invalid parser with alias") { + val newContext = new SQLContext(TestSQLContext.sparkContext) + newContext.sql("SET spark.sql.dialect=MyTestClass") + intercept[DialectException] { + newContext.sql("SELECT 1") + } + // test if the dialect set back to DefaultSQLDialect + assert(newContext.getSQLDialect().getClass === classOf[DefaultDialect]) + } + test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") { checkAnswer( sql("SELECT a FROM testData2 SORT BY a"), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index dd06b2620c..1d8d0b5c32 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.hive import java.io.{BufferedReader, InputStreamReader, PrintStream} import java.sql.Timestamp +import org.apache.hadoop.hive.ql.parse.VariableSubstitution +import org.apache.spark.sql.catalyst.Dialect + import scala.collection.JavaConversions._ import scala.language.implicitConversions @@ -42,6 +45,15 @@ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNative import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy} import org.apache.spark.sql.types._ +/** + * This is the HiveQL Dialect, this dialect is strongly bind with HiveContext + */ +private[hive] class HiveQLDialect extends Dialect { + override def parse(sqlText: String): LogicalPlan = { + HiveQl.parseSql(sqlText) + } +} + /** * An instance of the Spark SQL execution engine that integrates with data stored in Hive. * Configuration for Hive is read from hive-site.xml on the classpath. @@ -81,25 +93,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[sql] def convertCTAS: Boolean = getConf("spark.sql.hive.convertCTAS", "false").toBoolean - override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = - new this.QueryExecution(plan) - @transient - protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_)) - - override def sql(sqlText: String): DataFrame = { - val substituted = new VariableSubstitution().substitute(hiveconf, sqlText) - // TODO: Create a framework for registering parsers instead of just hardcoding if statements. - if (conf.dialect == "sql") { - super.sql(substituted) - } else if (conf.dialect == "hiveql") { - val ddlPlan = ddlParserWithHiveQL.parse(sqlText, exceptionOnError = false) - DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted))) - } else { - sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'") - } + protected[sql] lazy val substitutor = new VariableSubstitution() + + protected[sql] override def parseSql(sql: String): LogicalPlan = { + super.parseSql(substitutor.substitute(hiveconf, sql)) } + override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = + new this.QueryExecution(plan) + /** * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, * Spark SQL or the external data source library it uses might cache certain metadata about a @@ -356,6 +359,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } + override protected[sql] def dialectClassName = if (conf.dialect == "hiveql") { + classOf[HiveQLDialect].getCanonicalName + } else { + super.dialectClassName + } + @transient private val hivePlanner = new SparkPlanner with HiveStrategies { val hiveContext = self diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 9f17bca083..edeab5158d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -107,7 +107,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { /** Fewer partitions to speed up testing. */ protected[sql] override lazy val conf: SQLConf = new SQLConf { override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt - override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") + + // TODO as in unit test, conf.clear() probably be called, all of the value will be cleared. + // The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql" + override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 4f8d0ac0e7..630dec8fa0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -18,14 +18,17 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries -import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim} +import org.apache.spark.sql.catalyst.errors.DialectException +import org.apache.spark.sql.DefaultDialect +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} +import org.apache.spark.sql.hive.MetastoreRelation import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim} import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) @@ -45,6 +48,9 @@ case class Order( state: String, month: Int) +/** A SQL Dialect for testing purpose, and it can not be nested type */ +class MyDialect extends DefaultDialect + /** * A collection of hive query tests where we generate the answers ourselves instead of depending on * Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is @@ -229,6 +235,35 @@ class SQLQuerySuite extends QueryTest { setConf("spark.sql.hive.convertCTAS", originalConf) } + test("SQL Dialect Switching") { + assert(getSQLDialect().getClass === classOf[HiveQLDialect]) + setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName()) + assert(getSQLDialect().getClass === classOf[MyDialect]) + assert(sql("SELECT 1").collect() === Array(Row(1))) + + // set the dialect back to the DefaultSQLDialect + sql("SET spark.sql.dialect=sql") + assert(getSQLDialect().getClass === classOf[DefaultDialect]) + sql("SET spark.sql.dialect=hiveql") + assert(getSQLDialect().getClass === classOf[HiveQLDialect]) + + // set invalid dialect + sql("SET spark.sql.dialect.abc=MyTestClass") + sql("SET spark.sql.dialect=abc") + intercept[Exception] { + sql("SELECT 1") + } + // test if the dialect set back to HiveQLDialect + getSQLDialect().getClass === classOf[HiveQLDialect] + + sql("SET spark.sql.dialect=MyTestClass") + intercept[DialectException] { + sql("SELECT 1") + } + // test if the dialect set back to HiveQLDialect + assert(getSQLDialect().getClass === classOf[HiveQLDialect]) + } + test("CTAS with serde") { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect() sql( -- cgit v1.2.3