From 328c71161bdae569a534dcd05e14ec485e895c5c Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 28 Mar 2016 16:22:02 -0700 Subject: [SPARK-14086][SQL] Add DDL commands to ANTLR4 parser #### What changes were proposed in this pull request? This PR adds all the current Spark SQL DDL commands to the new ANTLR 4 based SQL parser. I have found a few inconsistencies in the current commands: - Function has an alias field. This is actually the class name of the function. - Partition specifications should contain nulls in some commands, and contain `None`s in others. - `AlterTableSkewedLocation`: Should defines which columns have skewed values, and should allow us to define storage for each skewed combination of values. We currently only allow one value per field. - `AlterTableSetFileFormat`: Should only have one file format, it currently supports both. I have implemented all these comments like they were, and I propose to improve them in follow-up PRs. #### How was this patch tested? The existing DDLCommandSuite. cc rxin andrewor14 yhuai Author: Herman van Hovell Closes #12011 from hvanhovell/SPARK-14086. --- .../spark/sql/execution/SparkSqlParser.scala | 620 ++++++++++++++++++++- .../sql/execution/command/DDLCommandSuite.scala | 5 +- 2 files changed, 619 insertions(+), 6 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c098fa99c2..a8313deeef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -20,7 +20,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.parser.ng.{AbstractSqlParser, AstBuilder} +import org.apache.spark.sql.catalyst.parser.ng.{AbstractSqlParser, AstBuilder, ParseException} import org.apache.spark.sql.catalyst.parser.ng.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.execution.command.{DescribeCommand => _, _} @@ -200,8 +200,8 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Convert a table property list into a key-value map. - */ + * Convert a table property list into a key-value map. + */ override def visitTablePropertyList( ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { ctx.tableProperty.asScala.map { property => @@ -216,4 +216,618 @@ class SparkSqlAstBuilder extends AstBuilder { key -> value }.toMap } + + /** + * Create a [[CreateDatabase]] command. + * + * For example: + * {{{ + * CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] + * [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)] + * }}} + */ + override def visitCreateDatabase(ctx: CreateDatabaseContext): LogicalPlan = withOrigin(ctx) { + CreateDatabase( + ctx.identifier.getText, + ctx.EXISTS != null, + Option(ctx.locationSpec).map(visitLocationSpec), + Option(ctx.comment).map(string), + Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))( + command(ctx)) + } + + /** + * Create an [[AlterDatabaseProperties]] command. + * + * For example: + * {{{ + * ALTER (DATABASE|SCHEMA) database SET DBPROPERTIES (property_name=property_value, ...); + * }}} + */ + override def visitSetDatabaseProperties( + ctx: SetDatabasePropertiesContext): LogicalPlan = withOrigin(ctx) { + AlterDatabaseProperties( + ctx.identifier.getText, + visitTablePropertyList(ctx.tablePropertyList))( + command(ctx)) + } + + /** + * Create a [[DropDatabase]] command. + * + * For example: + * {{{ + * DROP (DATABASE|SCHEMA) [IF EXISTS] database [RESTRICT|CASCADE]; + * }}} + */ + override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) { + DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE == null)(command(ctx)) + } + + /** + * Create a [[DescribeDatabase]] command. + * + * For example: + * {{{ + * DESCRIBE DATABASE [EXTENDED] database; + * }}} + */ + override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) { + DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)(command(ctx)) + } + + /** + * Create a [[CreateFunction]] command. + * + * For example: + * {{{ + * CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name + * [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']]; + * }}} + */ + override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) { + val resources = ctx.resource.asScala.map { resource => + val resourceType = resource.identifier.getText.toLowerCase + resourceType match { + case "jar" | "file" | "archive" => + resourceType -> string(resource.STRING) + case other => + throw new ParseException(s"Resource Type '$resourceType' is not supported.", ctx) + } + } + + // Extract database, name & alias. + val (database, function) = visitFunctionName(ctx.qualifiedName) + CreateFunction( + database, + function, + string(ctx.className), // TODO this is not an alias. + resources, + ctx.TEMPORARY != null)( + command(ctx)) + } + + /** + * Create a [[DropFunction]] command. + * + * For example: + * {{{ + * DROP [TEMPORARY] FUNCTION [IF EXISTS] function; + * }}} + */ + override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) { + val (database, function) = visitFunctionName(ctx.qualifiedName) + DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)(command(ctx)) + } + + /** + * Create a function database (optional) and name pair. + */ + private def visitFunctionName(ctx: QualifiedNameContext): (Option[String], String) = { + ctx.identifier().asScala.map(_.getText) match { + case Seq(db, fn) => (Option(db), fn) + case Seq(fn) => (None, fn) + case other => throw new ParseException(s"Unsupported function name '${ctx.getText}'", ctx) + } + } + + /** + * Create a [[AlterTableRename]] command. + * + * For example: + * {{{ + * ALTER TABLE table1 RENAME TO table2; + * }}} + */ + override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableRename( + visitTableIdentifier(ctx.from), + visitTableIdentifier(ctx.to))( + command(ctx)) + } + + /** + * Create an [[AlterTableSetProperties]] command. + * + * For example: + * {{{ + * ALTER TABLE table SET TBLPROPERTIES ('comment' = new_comment); + * }}} + */ + override def visitSetTableProperties( + ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) { + AlterTableSetProperties( + visitTableIdentifier(ctx.tableIdentifier), + visitTablePropertyList(ctx.tablePropertyList))( + command(ctx)) + } + + /** + * Create an [[AlterTableUnsetProperties]] command. + * + * For example: + * {{{ + * ALTER TABLE table UNSET TBLPROPERTIES IF EXISTS ('comment', 'key'); + * }}} + */ + override def visitUnsetTableProperties( + ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) { + AlterTableUnsetProperties( + visitTableIdentifier(ctx.tableIdentifier), + visitTablePropertyList(ctx.tablePropertyList), + ctx.EXISTS != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableSerDeProperties]] command. + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props]; + * ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties; + * }}} + */ + override def visitSetTableSerDe(ctx: SetTableSerDeContext): LogicalPlan = withOrigin(ctx) { + AlterTableSerDeProperties( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.STRING).map(string), + Option(ctx.tablePropertyList).map(visitTablePropertyList), + // TODO a partition spec is allowed to have optional values. This is currently violated. + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableStorageProperties]] command. + * + * For example: + * {{{ + * ALTER TABLE table CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS; + * }}} + */ + override def visitBucketTable(ctx: BucketTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableStorageProperties( + visitTableIdentifier(ctx.tableIdentifier), + visitBucketSpec(ctx.bucketSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableNotClustered]] command. + * + * For example: + * {{{ + * ALTER TABLE table NOT CLUSTERED; + * }}} + */ + override def visitUnclusterTable(ctx: UnclusterTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableNotClustered(visitTableIdentifier(ctx.tableIdentifier))(command(ctx)) + } + + /** + * Create an [[AlterTableNotSorted]] command. + * + * For example: + * {{{ + * ALTER TABLE table NOT SORTED; + * }}} + */ + override def visitUnsortTable(ctx: UnsortTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableNotSorted(visitTableIdentifier(ctx.tableIdentifier))(command(ctx)) + } + + /** + * Create an [[AlterTableSkewed]] command. + * + * For example: + * {{{ + * ALTER TABLE table SKEWED BY (col1, col2) + * ON ((col1_value, col2_value) [, (col1_value, col2_value), ...]) + * [STORED AS DIRECTORIES]; + * }}} + */ + override def visitSkewTable(ctx: SkewTableContext): LogicalPlan = withOrigin(ctx) { + val table = visitTableIdentifier(ctx.tableIdentifier) + val (cols, values, storedAsDirs) = visitSkewSpec(ctx.skewSpec) + AlterTableSkewed(table, cols, values, storedAsDirs)(command(ctx)) + } + + /** + * Create an [[AlterTableNotSorted]] command. + * + * For example: + * {{{ + * ALTER TABLE table NOT SKEWED; + * }}} + */ + override def visitUnskewTable(ctx: UnskewTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableNotSkewed(visitTableIdentifier(ctx.tableIdentifier))(command(ctx)) + } + + /** + * Create an [[AlterTableNotStoredAsDirs]] command. + * + * For example: + * {{{ + * ALTER TABLE table NOT STORED AS DIRECTORIES + * }}} + */ + override def visitUnstoreTable(ctx: UnstoreTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableNotStoredAsDirs(visitTableIdentifier(ctx.tableIdentifier))(command(ctx)) + } + + /** + * Create an [[AlterTableSkewedLocation]] command. + * + * For example: + * {{{ + * ALTER TABLE table SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] ); + * }}} + */ + override def visitSetTableSkewLocations( + ctx: SetTableSkewLocationsContext): LogicalPlan = withOrigin(ctx) { + val skewedMap = ctx.skewedLocationList.skewedLocation.asScala.flatMap { + slCtx => + val location = string(slCtx.STRING) + if (slCtx.constant != null) { + Seq(visitStringConstant(slCtx.constant) -> location) + } else { + // TODO this is similar to what was in the original implementation. However this does not + // make to much sense to me since we should be storing a tuple of values (not column + // names) for which we want a dedicated storage location. + visitConstantList(slCtx.constantList).map(_ -> location) + } + }.toMap + + AlterTableSkewedLocation( + visitTableIdentifier(ctx.tableIdentifier), + skewedMap)( + command(ctx)) + } + + /** + * Create an [[AlterTableAddPartition]] command. + * + * For example: + * {{{ + * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] + * }}} + */ + override def visitAddTablePartition( + ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) { + // Create partition spec to location mapping. + val specsAndLocs = ctx.partitionSpecLocation.asScala.map { + splCtx => + val spec = visitNonOptionalPartitionSpec(splCtx.partitionSpec) + val location = Option(splCtx.locationSpec).map(visitLocationSpec) + spec -> location + } + AlterTableAddPartition( + visitTableIdentifier(ctx.tableIdentifier), + specsAndLocs, + ctx.EXISTS != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableExchangePartition]] command. + * + * For example: + * {{{ + * ALTER TABLE table1 EXCHANGE PARTITION spec WITH TABLE table2; + * }}} + */ + override def visitExchangeTablePartition( + ctx: ExchangeTablePartitionContext): LogicalPlan = withOrigin(ctx) { + AlterTableExchangePartition( + visitTableIdentifier(ctx.from), + visitTableIdentifier(ctx.to), + visitNonOptionalPartitionSpec(ctx.partitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableRenamePartition]] command + * + * For example: + * {{{ + * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2; + * }}} + */ + override def visitRenameTablePartition( + ctx: RenameTablePartitionContext): LogicalPlan = withOrigin(ctx) { + AlterTableRenamePartition( + visitTableIdentifier(ctx.tableIdentifier), + visitNonOptionalPartitionSpec(ctx.from), + visitNonOptionalPartitionSpec(ctx.to))( + command(ctx)) + } + + /** + * Create an [[AlterTableDropPartition]] command + * + * For example: + * {{{ + * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * }}} + */ + override def visitDropTablePartitions( + ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) { + AlterTableDropPartition( + visitTableIdentifier(ctx.tableIdentifier), + ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.EXISTS != null, + ctx.PURGE != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableArchivePartition]] command + * + * For example: + * {{{ + * ALTER TABLE table ARCHIVE PARTITION spec; + * }}} + */ + override def visitArchiveTablePartition( + ctx: ArchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) { + AlterTableArchivePartition( + visitTableIdentifier(ctx.tableIdentifier), + visitNonOptionalPartitionSpec(ctx.partitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableUnarchivePartition]] command + * + * For example: + * {{{ + * ALTER TABLE table UNARCHIVE PARTITION spec; + * }}} + */ + override def visitUnarchiveTablePartition( + ctx: UnarchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) { + AlterTableUnarchivePartition( + visitTableIdentifier(ctx.tableIdentifier), + visitNonOptionalPartitionSpec(ctx.partitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableSetFileFormat]] command + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] SET FILEFORMAT file_format; + * }}} + */ + override def visitSetTableFileFormat( + ctx: SetTableFileFormatContext): LogicalPlan = withOrigin(ctx) { + // AlterTableSetFileFormat currently takes both a GenericFileFormat and a + // TableFileFormatContext. This is a bit weird because it should only take one. It also should + // use a CatalogFileFormat instead of either a String or a Sequence of Strings. We will address + // this in a follow-up PR. + val (fileFormat, genericFormat) = ctx.fileFormat match { + case s: GenericFileFormatContext => + (Seq.empty[String], Option(s.identifier.getText)) + case s: TableFileFormatContext => + val elements = Seq(s.inFmt, s.outFmt) ++ + Option(s.serdeCls).toSeq ++ + Option(s.inDriver).toSeq ++ + Option(s.outDriver).toSeq + (elements.map(string), None) + } + AlterTableSetFileFormat( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + fileFormat, + genericFormat)( + command(ctx)) + } + + /** + * Create an [[AlterTableSetLocation]] command + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] SET LOCATION "loc"; + * }}} + */ + override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) { + AlterTableSetLocation( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + visitLocationSpec(ctx.locationSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableTouch]] command + * + * For example: + * {{{ + * ALTER TABLE table TOUCH [PARTITION spec]; + * }}} + */ + override def visitTouchTable(ctx: TouchTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableTouch( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableCompact]] command + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] COMPACT 'compaction_type'; + * }}} + */ + override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableCompact( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + string(ctx.STRING))( + command(ctx)) + } + + /** + * Create an [[AlterTableMerge]] command + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] CONCATENATE; + * }}} + */ + override def visitConcatenateTable(ctx: ConcatenateTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableMerge( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableChangeCol]] command + * + * For example: + * {{{ + * ALTER TABLE tableIdentifier [PARTITION spec] + * CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] + * [FIRST|AFTER column_name] [CASCADE|RESTRICT]; + * }}} + */ + override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = withOrigin(ctx) { + val col = visitColType(ctx.colType()) + val comment = if (col.metadata.contains("comment")) { + Option(col.metadata.getString("comment")) + } else { + None + } + + AlterTableChangeCol( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + ctx.oldName.getText, + // We could also pass in a struct field - seems easier. + col.name, + col.dataType, + comment, + Option(ctx.after).map(_.getText), + // Note that Restrict and Cascade are mutually exclusive. + ctx.RESTRICT != null, + ctx.CASCADE != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableAddCol]] command + * + * For example: + * {{{ + * ALTER TABLE tableIdentifier [PARTITION spec] + * ADD COLUMNS (name type [COMMENT comment], ...) [CASCADE|RESTRICT] + * }}} + */ + override def visitAddColumns(ctx: AddColumnsContext): LogicalPlan = withOrigin(ctx) { + AlterTableAddCol( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + createStructType(ctx.colTypeList), + // Note that Restrict and Cascade are mutually exclusive. + ctx.RESTRICT != null, + ctx.CASCADE != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableReplaceCol]] command + * + * For example: + * {{{ + * ALTER TABLE tableIdentifier [PARTITION spec] + * REPLACE COLUMNS (name type [COMMENT comment], ...) [CASCADE|RESTRICT] + * }}} + */ + override def visitReplaceColumns(ctx: ReplaceColumnsContext): LogicalPlan = withOrigin(ctx) { + AlterTableReplaceCol( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + createStructType(ctx.colTypeList), + // Note that Restrict and Cascade are mutually exclusive. + ctx.RESTRICT != null, + ctx.CASCADE != null)( + command(ctx)) + } + + /** + * Create location string. + */ + override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) { + string(ctx.STRING) + } + + /** + * Create a [[BucketSpec]]. + */ + override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) { + BucketSpec( + ctx.INTEGER_VALUE.getText.toInt, + visitIdentifierList(ctx.identifierList), + Option(ctx.orderedIdentifierList).toSeq + .flatMap(_.orderedIdentifier.asScala) + .map(_.identifier.getText)) + } + + /** + * Create a skew specification. This contains three components: + * - The Skewed Columns + * - Values for which are skewed. The size of each entry must match the number of skewed columns. + * - A store in directory flag. + */ + override def visitSkewSpec( + ctx: SkewSpecContext): (Seq[String], Seq[Seq[String]], Boolean) = withOrigin(ctx) { + val skewedValues = if (ctx.constantList != null) { + Seq(visitConstantList(ctx.constantList)) + } else { + visitNestedConstantList(ctx.nestedConstantList) + } + (visitIdentifierList(ctx.identifierList), skewedValues, ctx.DIRECTORIES != null) + } + + /** + * Convert a nested constants list into a sequence of string sequences. + */ + override def visitNestedConstantList( + ctx: NestedConstantListContext): Seq[Seq[String]] = withOrigin(ctx) { + ctx.constantList.asScala.map(visitConstantList) + } + + /** + * Convert a constants list into a String sequence. + */ + override def visitConstantList(ctx: ConstantListContext): Seq[String] = withOrigin(ctx) { + ctx.constant.asScala.map(visitStringConstant) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 7a6343748b..03079c6890 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -18,14 +18,13 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.execution.SparkQl +import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ class DDLCommandSuite extends PlanTest { - private val parser = new SparkQl + private val parser = SparkSqlParser test("create database") { val sql = -- cgit v1.2.3