aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-05-02 15:20:07 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-02 15:20:07 -0700
commit5d6b90d939d281130c786be38fd1794c74391b08 (patch)
treebb20ece873f0020943718c65ef99da8b6981efec /sql/core/src
parent82c8c37c098e5886da65cea3108737744e270b91 (diff)
downloadspark-5d6b90d939d281130c786be38fd1794c74391b08.tar.gz
spark-5d6b90d939d281130c786be38fd1794c74391b08.tar.bz2
spark-5d6b90d939d281130c786be38fd1794c74391b08.zip
[SPARK-5213] [SQL] Pluggable SQL Parser Support
based on #4015, we should not delete `sqlParser` from sqlcontext, that leads to mima failed. Users implement dialect to give a fallback for `sqlParser` and we should construct `sqlParser` in sqlcontext according to the dialect `protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))` Author: Cheng Hao <hao.cheng@intel.com> Author: scwf <wangfei1@huawei.com> Closes #5827 from scwf/sqlparser1 and squashes the following commits: 81b9737 [scwf] comment fix 0878bd1 [scwf] remove comments c19780b [scwf] fix mima tests c2895cf [scwf] Merge branch 'master' of https://github.com/apache/spark into sqlparser1 493775c [Cheng Hao] update the code as feedback 81a731f [Cheng Hao] remove the unecessary comment aab0b0b [Cheng Hao] polish the code a little bit 49b9d81 [Cheng Hao] shrink the comment for rebasing
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala76
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala22
3 files changed, 90 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 5116fcefd4..7eabb93c1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConversions._
import scala.collection.immutable
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
+import scala.util.control.NonFatal
import com.google.common.reflect.TypeToken
@@ -32,9 +33,11 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.Dialect
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions}
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
@@ -45,6 +48,42 @@ import org.apache.spark.util.Utils
import org.apache.spark.{Partition, SparkContext}
/**
+ * Currently we support the default dialect named "sql", associated with the class
+ * [[DefaultDialect]]
+ *
+ * And we can also provide custom SQL Dialect, for example in Spark SQL CLI:
+ * {{{
+ *-- switch to "hiveql" dialect
+ * spark-sql>SET spark.sql.dialect=hiveql;
+ * spark-sql>SELECT * FROM src LIMIT 1;
+ *
+ *-- switch to "sql" dialect
+ * spark-sql>SET spark.sql.dialect=sql;
+ * spark-sql>SELECT * FROM src LIMIT 1;
+ *
+ *-- register the new SQL dialect
+ * spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect;
+ * spark-sql> SELECT * FROM src LIMIT 1;
+ *
+ *-- register the non-exist SQL dialect
+ * spark-sql> SET spark.sql.dialect=NotExistedClass;
+ * spark-sql> SELECT * FROM src LIMIT 1;
+ *
+ *-- Exception will be thrown and switch to dialect
+ *-- "sql" (for SQLContext) or
+ *-- "hiveql" (for HiveContext)
+ * }}}
+ */
+private[spark] class DefaultDialect extends Dialect {
+ @transient
+ protected val sqlParser = new catalyst.SqlParser
+
+ override def parse(sqlText: String): LogicalPlan = {
+ sqlParser.parse(sqlText)
+ }
+}
+
+/**
* The entry point for working with structured data (rows and columns) in Spark. Allows the
* creation of [[DataFrame]] objects as well as the execution of SQL queries.
*
@@ -135,14 +174,27 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
@transient
- protected[sql] val sqlParser = {
- val fallback = new catalyst.SqlParser
- new SparkSQLParser(fallback.parse(_))
+ protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
+
+ protected[sql] def getSQLDialect(): Dialect = {
+ try {
+ val clazz = Utils.classForName(dialectClassName)
+ clazz.newInstance().asInstanceOf[Dialect]
+ } catch {
+ case NonFatal(e) =>
+ // Since we didn't find the available SQL Dialect, it will fail even for SET command:
+ // SET spark.sql.dialect=sql; Let's reset as default dialect automatically.
+ val dialect = conf.dialect
+ // reset the sql dialect
+ conf.unsetConf(SQLConf.DIALECT)
+ // throw out the exception, and the default sql dialect will take effect for next query.
+ throw new DialectException(
+ s"""Instantiating dialect '$dialect' failed.
+ |Reverting to default dialect '${conf.dialect}'""".stripMargin, e)
+ }
}
- protected[sql] def parseSql(sql: String): LogicalPlan = {
- ddlParser.parse(sql, false).getOrElse(sqlParser.parse(sql))
- }
+ protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
@@ -156,6 +208,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
@transient
protected[sql] val defaultSession = createSession()
+ protected[sql] def dialectClassName = if (conf.dialect == "sql") {
+ classOf[DefaultDialect].getCanonicalName
+ } else {
+ conf.dialect
+ }
+
sparkContext.getConf.getAll.foreach {
case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
case _ =>
@@ -931,11 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group basic
*/
def sql(sqlText: String): DataFrame = {
- if (conf.dialect == "sql") {
- DataFrame(this, parseSql(sqlText))
- } else {
- sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
- }
+ DataFrame(this, parseSql(sqlText))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index e7a0685e01..1abf3aa51c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -38,12 +38,12 @@ private[sql] class DDLParser(
parseQuery: String => LogicalPlan)
extends AbstractSparkSQLParser with DataTypeParser with Logging {
- def parse(input: String, exceptionOnError: Boolean): Option[LogicalPlan] = {
+ def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
try {
- Some(parse(input))
+ parse(input)
} catch {
case ddlException: DDLException => throw ddlException
- case _ if !exceptionOnError => None
+ case _ if !exceptionOnError => parseQuery(input)
case x: Throwable => throw x
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index d8e7cdbd3a..0ab8558c1d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -19,13 +19,18 @@ package org.apache.spark.sql
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.GeneratedAggregate
import org.apache.spark.sql.functions._
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}
+
import org.apache.spark.sql.types._
+/** A SQL Dialect for testing purpose, and it can not be nested type */
+class MyDialect extends DefaultDialect
+
class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
// Make sure the tables are loaded.
TestData
@@ -74,6 +79,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
}
+ test("SQL Dialect Switching to a new SQL parser") {
+ val newContext = new SQLContext(TestSQLContext.sparkContext)
+ newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
+ assert(newContext.getSQLDialect().getClass === classOf[MyDialect])
+ assert(newContext.sql("SELECT 1").collect() === Array(Row(1)))
+ }
+
+ test("SQL Dialect Switch to an invalid parser with alias") {
+ val newContext = new SQLContext(TestSQLContext.sparkContext)
+ newContext.sql("SET spark.sql.dialect=MyTestClass")
+ intercept[DialectException] {
+ newContext.sql("SELECT 1")
+ }
+ // test if the dialect set back to DefaultSQLDialect
+ assert(newContext.getSQLDialect().getClass === classOf[DefaultDialect])
+ }
+
test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") {
checkAnswer(
sql("SELECT a FROM testData2 SORT BY a"),