aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project/MimaExcludes.scala5
-rw-r--r--sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g14
-rw-r--r--sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlLexer.g4
-rw-r--r--sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g80
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala101
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala193
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala7
10 files changed, 208 insertions, 229 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 3748e07f88..8b1a7303fc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -201,6 +201,11 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
) ++ Seq(
+ // SPARK-12689 Migrate DDL parsing to the newly absorbed parser
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.execution.datasources.DDLParser"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.execution.datasources.DDLException"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.ddlParser")
+ ) ++ Seq(
// SPARK-7799 Add "streaming-akka" project
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"),
diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
index 0555a6ba83..c162c1a0c5 100644
--- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
+++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
@@ -493,6 +493,16 @@ descFuncNames
| functionIdentifier
;
+//We are allowed to use From and To in CreateTableUsing command's options (actually seems we can use any string as the option key). But we can't simply add them into nonReserved because by doing that we mess other existing rules. So we create a looseIdentifier and looseNonReserved here.
+looseIdentifier
+ :
+ Identifier
+ | looseNonReserved -> Identifier[$looseNonReserved.text]
+ // If it decides to support SQL11 reserved keywords, i.e., useSQL11ReservedKeywordsForIdentifier()=false,
+ // the sql11keywords in existing q tests will NOT be added back.
+ | {useSQL11ReservedKeywordsForIdentifier()}? sql11ReservedKeywordsUsedAsIdentifier -> Identifier[$sql11ReservedKeywordsUsedAsIdentifier.text]
+ ;
+
identifier
:
Identifier
@@ -516,6 +526,10 @@ principalIdentifier
| QuotedIdentifier
;
+looseNonReserved
+ : nonReserved | KW_FROM | KW_TO
+ ;
+
//The new version of nonReserved + sql11ReservedKeywordsUsedAsIdentifier = old version of nonReserved
//Non reserved keywords are basically the keywords that can be used as identifiers.
//All the KW_* are automatically not only keywords, but also reserved keywords.
diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlLexer.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlLexer.g
index 4374cd7ef7..e930caa291 100644
--- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlLexer.g
+++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlLexer.g
@@ -324,6 +324,8 @@ KW_ISOLATION: 'ISOLATION';
KW_LEVEL: 'LEVEL';
KW_SNAPSHOT: 'SNAPSHOT';
KW_AUTOCOMMIT: 'AUTOCOMMIT';
+KW_REFRESH: 'REFRESH';
+KW_OPTIONS: 'OPTIONS';
KW_WEEK: 'WEEK'|'WEEKS';
KW_MILLISECOND: 'MILLISECOND'|'MILLISECONDS';
KW_MICROSECOND: 'MICROSECOND'|'MICROSECONDS';
@@ -470,7 +472,7 @@ Identifier
fragment
QuotedIdentifier
:
- '`' ( '``' | ~('`') )* '`' { setText(getText().substring(1, getText().length() -1 ).replaceAll("``", "`")); }
+ '`' ( '``' | ~('`') )* '`' { setText(getText().replaceAll("``", "`")); }
;
WS : (' '|'\r'|'\t'|'\n') {$channel=HIDDEN;}
diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g
index 35bef00351..6591f6b0f5 100644
--- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g
+++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g
@@ -142,6 +142,7 @@ TOK_UNIONTYPE;
TOK_COLTYPELIST;
TOK_CREATEDATABASE;
TOK_CREATETABLE;
+TOK_CREATETABLEUSING;
TOK_TRUNCATETABLE;
TOK_CREATEINDEX;
TOK_CREATEINDEX_INDEXTBLNAME;
@@ -371,6 +372,10 @@ TOK_TXN_READ_WRITE;
TOK_COMMIT;
TOK_ROLLBACK;
TOK_SET_AUTOCOMMIT;
+TOK_REFRESHTABLE;
+TOK_TABLEPROVIDER;
+TOK_TABLEOPTIONS;
+TOK_TABLEOPTION;
TOK_CACHETABLE;
TOK_UNCACHETABLE;
TOK_CLEARCACHE;
@@ -660,6 +665,12 @@ import java.util.HashMap;
}
private char [] excludedCharForColumnName = {'.', ':'};
private boolean containExcludedCharForCreateTableColumnName(String input) {
+ if (input.length() > 0) {
+ if (input.charAt(0) == '`' && input.charAt(input.length() - 1) == '`') {
+ // When column name is backquoted, we don't care about excluded chars.
+ return false;
+ }
+ }
for(char c : excludedCharForColumnName) {
if(input.indexOf(c)>-1) {
return true;
@@ -781,6 +792,7 @@ ddlStatement
| truncateTableStatement
| alterStatement
| descStatement
+ | refreshStatement
| showStatement
| metastoreCheck
| createViewStatement
@@ -907,12 +919,31 @@ createTableStatement
@init { pushMsg("create table statement", state); }
@after { popMsg(state); }
: KW_CREATE (temp=KW_TEMPORARY)? (ext=KW_EXTERNAL)? KW_TABLE ifNotExists? name=tableName
- ( like=KW_LIKE likeName=tableName
+ (
+ like=KW_LIKE likeName=tableName
tableRowFormat?
tableFileFormat?
tableLocation?
tablePropertiesPrefixed?
+ -> ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists?
+ ^(TOK_LIKETABLE $likeName?)
+ tableRowFormat?
+ tableFileFormat?
+ tableLocation?
+ tablePropertiesPrefixed?
+ )
+ |
+ tableProvider
+ tableOpts?
+ (KW_AS selectStatementWithCTE)?
+ -> ^(TOK_CREATETABLEUSING $name $temp? ifNotExists?
+ tableProvider
+ tableOpts?
+ selectStatementWithCTE?
+ )
| (LPAREN columnNameTypeList RPAREN)?
+ (p=tableProvider?)
+ tableOpts?
tableComment?
tablePartition?
tableBuckets?
@@ -922,8 +953,15 @@ createTableStatement
tableLocation?
tablePropertiesPrefixed?
(KW_AS selectStatementWithCTE)?
- )
- -> ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists?
+ -> {p != null}?
+ ^(TOK_CREATETABLEUSING $name $temp? ifNotExists?
+ columnNameTypeList?
+ $p
+ tableOpts?
+ selectStatementWithCTE?
+ )
+ ->
+ ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists?
^(TOK_LIKETABLE $likeName?)
columnNameTypeList?
tableComment?
@@ -935,7 +973,8 @@ createTableStatement
tableLocation?
tablePropertiesPrefixed?
selectStatementWithCTE?
- )
+ )
+ )
;
truncateTableStatement
@@ -1379,6 +1418,13 @@ tabPartColTypeExpr
: tableName partitionSpec? extColumnName? -> ^(TOK_TABTYPE tableName partitionSpec? extColumnName?)
;
+refreshStatement
+@init { pushMsg("refresh statement", state); }
+@after { popMsg(state); }
+ :
+ KW_REFRESH KW_TABLE tableName -> ^(TOK_REFRESHTABLE tableName)
+ ;
+
descStatement
@init { pushMsg("describe statement", state); }
@after { popMsg(state); }
@@ -1774,6 +1820,30 @@ showStmtIdentifier
| StringLiteral
;
+tableProvider
+@init { pushMsg("table's provider", state); }
+@after { popMsg(state); }
+ :
+ KW_USING Identifier (DOT Identifier)*
+ -> ^(TOK_TABLEPROVIDER Identifier+)
+ ;
+
+optionKeyValue
+@init { pushMsg("table's option specification", state); }
+@after { popMsg(state); }
+ :
+ (looseIdentifier (DOT looseIdentifier)*) StringLiteral
+ -> ^(TOK_TABLEOPTION looseIdentifier+ StringLiteral)
+ ;
+
+tableOpts
+@init { pushMsg("table's options", state); }
+@after { popMsg(state); }
+ :
+ KW_OPTIONS LPAREN optionKeyValue (COMMA optionKeyValue)* RPAREN
+ -> ^(TOK_TABLEOPTIONS optionKeyValue+)
+ ;
+
tableComment
@init { pushMsg("table's comment", state); }
@after { popMsg(state); }
@@ -2132,7 +2202,7 @@ structType
mapType
@init { pushMsg("map type", state); }
@after { popMsg(state); }
- : KW_MAP LESSTHAN left=primitiveType COMMA right=type GREATERTHAN
+ : KW_MAP LESSTHAN left=type COMMA right=type GREATERTHAN
-> ^(TOK_MAP $left $right)
;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
index 536c292ab7..7ce2407913 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
@@ -140,6 +140,7 @@ private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends
case Token("TOK_BOOLEAN", Nil) => BooleanType
case Token("TOK_STRING", Nil) => StringType
case Token("TOK_VARCHAR", Token(_, Nil) :: Nil) => StringType
+ case Token("TOK_CHAR", Token(_, Nil) :: Nil) => StringType
case Token("TOK_FLOAT", Nil) => FloatType
case Token("TOK_DOUBLE", Nil) => DoubleType
case Token("TOK_DATE", Nil) => DateType
@@ -156,9 +157,10 @@ private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends
protected def nodeToStructField(node: ASTNode): StructField = node match {
case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: Nil) =>
- StructField(fieldName, nodeToDataType(dataType), nullable = true)
- case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: _ /* comment */:: Nil) =>
- StructField(fieldName, nodeToDataType(dataType), nullable = true)
+ StructField(cleanIdentifier(fieldName), nodeToDataType(dataType), nullable = true)
+ case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: comment :: Nil) =>
+ val meta = new MetadataBuilder().putString("comment", unquoteString(comment.text)).build()
+ StructField(cleanIdentifier(fieldName), nodeToDataType(dataType), nullable = true, meta)
case _ =>
noParseRule("StructField", node)
}
@@ -222,15 +224,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case Nil =>
ShowFunctions(None, None)
case Token(name, Nil) :: Nil =>
- ShowFunctions(None, Some(unquoteString(name)))
+ ShowFunctions(None, Some(unquoteString(cleanIdentifier(name))))
case Token(db, Nil) :: Token(name, Nil) :: Nil =>
- ShowFunctions(Some(unquoteString(db)), Some(unquoteString(name)))
+ ShowFunctions(Some(unquoteString(cleanIdentifier(db))),
+ Some(unquoteString(cleanIdentifier(name))))
case _ =>
noParseRule("SHOW FUNCTIONS", node)
}
case Token("TOK_DESCFUNCTION", Token(functionName, Nil) :: isExtended) =>
- DescribeFunction(functionName, isExtended.nonEmpty)
+ DescribeFunction(cleanIdentifier(functionName), isExtended.nonEmpty)
case Token("TOK_QUERY", queryArgs @ Token("TOK_CTE" | "TOK_FROM" | "TOK_INSERT", _) :: _) =>
val (fromClause: Option[ASTNode], insertClauses, cteRelations) =
@@ -611,7 +614,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
noParseRule("Select", node)
}
- protected val escapedIdentifier = "`([^`]+)`".r
+ protected val escapedIdentifier = "`(.+)`".r
protected val doubleQuotedString = "\"([^\"]+)\"".r
protected val singleQuotedString = "'([^']+)'".r
@@ -655,7 +658,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
nodeToExpr(qualifier) match {
case UnresolvedAttribute(nameParts) =>
UnresolvedAttribute(nameParts :+ cleanIdentifier(attr))
- case other => UnresolvedExtractValue(other, Literal(attr))
+ case other => UnresolvedExtractValue(other, Literal(cleanIdentifier(attr)))
}
/* Stars (*) */
@@ -663,7 +666,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
// The format of dbName.tableName.* cannot be parsed by HiveParser. TOK_TABNAME will only
// has a single child which is tableName.
case Token("TOK_ALLCOLREF", Token("TOK_TABNAME", target) :: Nil) if target.nonEmpty =>
- UnresolvedStar(Some(target.map(_.text)))
+ UnresolvedStar(Some(target.map(x => cleanIdentifier(x.text))))
/* Aggregate Functions */
case Token("TOK_FUNCTIONDI", Token(COUNT(), Nil) :: args) =>
@@ -971,7 +974,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
protected def nodeToGenerate(node: ASTNode, outer: Boolean, child: LogicalPlan): Generate = {
val Token("TOK_SELECT", Token("TOK_SELEXPR", clauses) :: Nil) = node
- val alias = getClause("TOK_TABALIAS", clauses).children.head.text
+ val alias = cleanIdentifier(getClause("TOK_TABALIAS", clauses).children.head.text)
val generator = clauses.head match {
case Token("TOK_FUNCTION", Token(explode(), Nil) :: childNode :: Nil) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index be28df3a51..ef993c3eda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -206,10 +206,7 @@ class SQLContext private[sql](
@transient
protected[sql] val sqlParser: ParserInterface = new SparkQl(conf)
- @transient
- protected[sql] val ddlParser: DDLParser = new DDLParser(sqlParser)
-
- protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
+ protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql)
protected[sql] def executeSql(sql: String):
org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
index a5bd8ee42d..4174e27e9c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
@@ -16,11 +16,14 @@
*/
package org.apache.spark.sql.execution
+import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.parser.{ASTNode, ParserConf, SimpleParserConf}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.types.StructType
private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends CatalystQl(conf) {
/** Check if a command should not be explained. */
@@ -55,6 +58,86 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
ExplainCommand(nodeToPlan(query), extended = extended.isDefined)
+ case Token("TOK_REFRESHTABLE", nameParts :: Nil) =>
+ val tableIdent = extractTableIdent(nameParts)
+ RefreshTable(tableIdent)
+
+ case Token("TOK_CREATETABLEUSING", createTableArgs) =>
+ val Seq(
+ temp,
+ allowExisting,
+ Some(tabName),
+ tableCols,
+ Some(Token("TOK_TABLEPROVIDER", providerNameParts)),
+ tableOpts,
+ tableAs) = getClauses(Seq(
+ "TEMPORARY",
+ "TOK_IFNOTEXISTS",
+ "TOK_TABNAME", "TOK_TABCOLLIST",
+ "TOK_TABLEPROVIDER",
+ "TOK_TABLEOPTIONS",
+ "TOK_QUERY"), createTableArgs)
+
+ val tableIdent: TableIdentifier = extractTableIdent(tabName)
+
+ val columns = tableCols.map {
+ case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField))
+ }
+
+ val provider = providerNameParts.map {
+ case Token(name, Nil) => name
+ }.mkString(".")
+
+ val options: Map[String, String] = tableOpts.toSeq.flatMap {
+ case Token("TOK_TABLEOPTIONS", options) =>
+ options.map {
+ case Token("TOK_TABLEOPTION", keysAndValue) =>
+ val key = keysAndValue.init.map(_.text).mkString(".")
+ val value = unquoteString(keysAndValue.last.text)
+ (key, value)
+ }
+ }.toMap
+
+ val asClause = tableAs.map(nodeToPlan(_))
+
+ if (temp.isDefined && allowExisting.isDefined) {
+ throw new AnalysisException(
+ "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
+ }
+
+ if (asClause.isDefined) {
+ if (columns.isDefined) {
+ throw new AnalysisException(
+ "a CREATE TABLE AS SELECT statement does not allow column definitions.")
+ }
+
+ val mode = if (allowExisting.isDefined) {
+ SaveMode.Ignore
+ } else if (temp.isDefined) {
+ SaveMode.Overwrite
+ } else {
+ SaveMode.ErrorIfExists
+ }
+
+ CreateTableUsingAsSelect(tableIdent,
+ provider,
+ temp.isDefined,
+ Array.empty[String],
+ bucketSpec = None,
+ mode,
+ options,
+ asClause.get)
+ } else {
+ CreateTableUsing(
+ tableIdent,
+ columns,
+ provider,
+ temp.isDefined,
+ options,
+ allowExisting.isDefined,
+ managedIfNoPath = false)
+ }
+
case Token("TOK_SWITCHDATABASE", Token(database, Nil) :: Nil) =>
SetDatabaseCommand(cleanIdentifier(database))
@@ -68,26 +151,30 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
nodeToDescribeFallback(node)
} else {
tableType match {
- case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts :: Nil) :: Nil) =>
+ case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts) :: Nil) =>
nameParts match {
- case Token(".", dbName :: tableName :: Nil) =>
+ case Token(dbName, Nil) :: Token(tableName, Nil) :: Nil =>
// It is describing a table with the format like "describe db.table".
// TODO: Actually, a user may mean tableName.columnName. Need to resolve this
// issue.
- val tableIdent = extractTableIdent(nameParts)
+ val tableIdent = TableIdentifier(
+ cleanIdentifier(tableName), Some(cleanIdentifier(dbName)))
datasources.DescribeCommand(
UnresolvedRelation(tableIdent, None), isExtended = extended.isDefined)
- case Token(".", dbName :: tableName :: colName :: Nil) =>
+ case Token(dbName, Nil) :: Token(tableName, Nil) :: Token(colName, Nil) :: Nil =>
// It is describing a column with the format like "describe db.table column".
nodeToDescribeFallback(node)
- case tableName =>
+ case tableName :: Nil =>
// It is describing a table with the format like "describe table".
datasources.DescribeCommand(
- UnresolvedRelation(TableIdentifier(tableName.text), None),
+ UnresolvedRelation(TableIdentifier(cleanIdentifier(tableName.text)), None),
isExtended = extended.isDefined)
+ case _ =>
+ nodeToDescribeFallback(node)
}
// All other cases.
- case _ => nodeToDescribeFallback(node)
+ case _ =>
+ nodeToDescribeFallback(node)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
deleted file mode 100644
index f4766b0370..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.datasources
-
-import scala.language.implicitConversions
-import scala.util.matching.Regex
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, ParserInterface, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.util.DataTypeParser
-import org.apache.spark.sql.types._
-
-/**
- * A parser for foreign DDL commands.
- */
-class DDLParser(fallback: => ParserInterface)
- extends AbstractSparkSQLParser with DataTypeParser with Logging {
-
- override def parseExpression(sql: String): Expression = fallback.parseExpression(sql)
-
- override def parseTableIdentifier(sql: String): TableIdentifier = {
- fallback.parseTableIdentifier(sql)
- }
-
- def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
- try {
- parsePlan(input)
- } catch {
- case ddlException: DDLException => throw ddlException
- case _ if !exceptionOnError => fallback.parsePlan(input)
- case x: Throwable => throw x
- }
- }
-
- // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
- // properties via reflection the class in runtime for constructing the SqlLexical object
- protected val CREATE = Keyword("CREATE")
- protected val TEMPORARY = Keyword("TEMPORARY")
- protected val TABLE = Keyword("TABLE")
- protected val IF = Keyword("IF")
- protected val NOT = Keyword("NOT")
- protected val EXISTS = Keyword("EXISTS")
- protected val USING = Keyword("USING")
- protected val OPTIONS = Keyword("OPTIONS")
- protected val DESCRIBE = Keyword("DESCRIBE")
- protected val EXTENDED = Keyword("EXTENDED")
- protected val AS = Keyword("AS")
- protected val COMMENT = Keyword("COMMENT")
- protected val REFRESH = Keyword("REFRESH")
-
- protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
-
- protected def start: Parser[LogicalPlan] = ddl
-
- /**
- * `CREATE [TEMPORARY] TABLE [IF NOT EXISTS] avroTable
- * USING org.apache.spark.sql.avro
- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
- * or
- * `CREATE [TEMPORARY] TABLE [IF NOT EXISTS] avroTable(intField int, stringField string...)
- * USING org.apache.spark.sql.avro
- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
- * or
- * `CREATE [TEMPORARY] TABLE [IF NOT EXISTS] avroTable
- * USING org.apache.spark.sql.avro
- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
- * AS SELECT ...
- */
- protected lazy val createTable: Parser[LogicalPlan] = {
- // TODO: Support database.table.
- (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ tableIdentifier ~
- tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
- case temp ~ allowExisting ~ tableIdent ~ columns ~ provider ~ opts ~ query =>
- if (temp.isDefined && allowExisting.isDefined) {
- throw new DDLException(
- "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
- }
-
- val options = opts.getOrElse(Map.empty[String, String])
- if (query.isDefined) {
- if (columns.isDefined) {
- throw new DDLException(
- "a CREATE TABLE AS SELECT statement does not allow column definitions.")
- }
- // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
- val mode = if (allowExisting.isDefined) {
- SaveMode.Ignore
- } else if (temp.isDefined) {
- SaveMode.Overwrite
- } else {
- SaveMode.ErrorIfExists
- }
-
- val queryPlan = fallback.parsePlan(query.get)
- CreateTableUsingAsSelect(tableIdent,
- provider,
- temp.isDefined,
- Array.empty[String],
- bucketSpec = None,
- mode,
- options,
- queryPlan)
- } else {
- val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
- CreateTableUsing(
- tableIdent,
- userSpecifiedSchema,
- provider,
- temp.isDefined,
- options,
- allowExisting.isDefined,
- managedIfNoPath = false)
- }
- }
- }
-
- // This is the same as tableIdentifier in SqlParser.
- protected lazy val tableIdentifier: Parser[TableIdentifier] =
- (ident <~ ".").? ~ ident ^^ {
- case maybeDbName ~ tableName => TableIdentifier(tableName, maybeDbName)
- }
-
- protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
-
- /*
- * describe [extended] table avroTable
- * This will display all columns of table `avroTable` includes column_name,column_type,comment
- */
- protected lazy val describeTable: Parser[LogicalPlan] =
- (DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {
- case e ~ tableIdent =>
- DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)
- }
-
- protected lazy val refreshTable: Parser[LogicalPlan] =
- REFRESH ~> TABLE ~> tableIdentifier ^^ {
- case tableIndet =>
- RefreshTable(tableIndet)
- }
-
- protected lazy val options: Parser[Map[String, String]] =
- "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
-
- protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
-
- override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
- s"identifier matching regex $regex", {
- case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
- case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
- }
- )
-
- protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
- case name => name
- }
-
- protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
- case parts => parts.mkString(".")
- }
-
- protected lazy val pair: Parser[(String, String)] =
- optionName ~ stringLit ^^ { case k ~ v => (k, v) }
-
- protected lazy val column: Parser[StructField] =
- ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
- val meta = cm match {
- case Some(comment) =>
- new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
- case None => Metadata.empty
- }
-
- StructField(columnName, typ, nullable = true, meta)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index c3603936df..1554209be9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -169,8 +169,3 @@ class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
}
-
-/**
- * The exception thrown from the DDL parser.
- */
-class DDLException(message: String) extends RuntimeException(message)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 6fc9febe49..cb88a1c83c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -22,7 +22,6 @@ import java.io.{File, IOException}
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.execution.datasources.DDLException
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
@@ -105,7 +104,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with
sql("SELECT a, b FROM jsonTable"),
sql("SELECT a, b FROM jt").collect())
- val message = intercept[DDLException]{
+ val message = intercept[AnalysisException]{
sql(
s"""
|CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
@@ -156,7 +155,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with
}
test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") {
- val message = intercept[DDLException]{
+ val message = intercept[AnalysisException]{
sql(
s"""
|CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
@@ -173,7 +172,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with
}
test("a CTAS statement with column definitions is not allowed") {
- intercept[DDLException]{
+ intercept[AnalysisException]{
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable (a int, b string)