aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-03-11 15:13:48 -0800
committerYin Huai <yhuai@databricks.com>2016-03-11 15:13:48 -0800
commit66d9d0edfef986895490bcdeacbc0ca38e091702 (patch)
tree84037a13e7040fa88e0ace5aab18087d3e206e95 /sql/core/src
parent42afd72c654318e9fb1f2a204198221e797c2485 (diff)
downloadspark-66d9d0edfef986895490bcdeacbc0ca38e091702.tar.gz
spark-66d9d0edfef986895490bcdeacbc0ca38e091702.tar.bz2
spark-66d9d0edfef986895490bcdeacbc0ca38e091702.zip
[SPARK-13139][SQL] Parse Hive DDL commands ourselves
## What changes were proposed in this pull request? This patch is ported over from viirya's changes in #11048. Currently for most DDLs we just pass the query text directly to Hive. Instead, we should parse these commands ourselves and in the future (not part of this patch) use the `HiveCatalog` to process these DDLs. This is a pretext to merging `SQLContext` and `HiveContext`. Note: As of this patch we still pass the query text to Hive. The difference is that we now parse the commands ourselves so in the future we can just use our own catalog. ## How was this patch tested? Jenkins, new `DDLCommandSuite`, which comprises of about 40% of the changes here. Author: Andrew Or <andrew@databricks.com> Closes #11573 from andrewor14/parser-plus-plus.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala125
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala428
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala210
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala544
5 files changed, 1297 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 49a70a7c5f..36fe57f78b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -785,6 +785,15 @@ class SQLContext private[sql](
}
/**
+ * Executes a SQL query without parsing it, but instead passing it directly to an underlying
+ * system to process. This is currently only used for Hive DDLs and will be removed as soon
+ * as Spark can parse all supported Hive DDLs itself.
+ */
+ private[sql] def runNativeSql(sqlText: String): Seq[Row] = {
+ throw new UnsupportedOperationException
+ }
+
+ /**
* Returns the specified table as a [[DataFrame]].
*
* @group ddl_ops
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
index 471a5e436c..d12dab567b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
@@ -29,7 +29,26 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
import ParserUtils._
/** Check if a command should not be explained. */
- protected def isNoExplainCommand(command: String): Boolean = "TOK_DESCTABLE" == command
+ protected def isNoExplainCommand(command: String): Boolean = {
+ "TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command
+ }
+
+ /**
+ * For each node, extract properties in the form of a list ['key1', 'key2', 'key3', 'value']
+ * into a pair (key1.key2.key3, value).
+ */
+ private def extractProps(
+ props: Seq[ASTNode],
+ expectedNodeText: String): Seq[(String, String)] = {
+ props.map {
+ case Token(x, keysAndValue) if x == expectedNodeText =>
+ val key = keysAndValue.init.map { x => unquoteString(x.text) }.mkString(".")
+ val value = unquoteString(keysAndValue.last.text)
+ (key, value)
+ case p =>
+ parseFailed(s"Expected property '$expectedNodeText' in command", p)
+ }
+ }
protected override def nodeToPlan(node: ASTNode): LogicalPlan = {
node match {
@@ -64,10 +83,86 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
val tableIdent = extractTableIdent(nameParts)
RefreshTable(tableIdent)
+ // CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
+ // [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)];
+ case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) =>
+ val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq(
+ "TOK_IFNOTEXISTS",
+ "TOK_DATABASELOCATION",
+ "TOK_DATABASECOMMENT",
+ "TOK_DATABASEPROPERTIES"), args)
+ val location = dbLocation.map {
+ case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc)
+ case _ => parseFailed("Invalid CREATE DATABASE command", node)
+ }
+ val comment = databaseComment.map {
+ case Token("TOK_DATABASECOMMENT", Token(com, Nil) :: Nil) => unquoteString(com)
+ case _ => parseFailed("Invalid CREATE DATABASE command", node)
+ }
+ val props = dbprops.toSeq.flatMap {
+ case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) =>
+ extractProps(propList, "TOK_TABLEPROPERTY")
+ case _ => parseFailed("Invalid CREATE DATABASE command", node)
+ }.toMap
+ CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source)
+
+ // CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
+ // [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
+ case Token("TOK_CREATEFUNCTION", args) =>
+ // Example format:
+ //
+ // TOK_CREATEFUNCTION
+ // :- db_name
+ // :- func_name
+ // :- alias
+ // +- TOK_RESOURCE_LIST
+ // :- TOK_RESOURCE_URI
+ // : :- TOK_JAR
+ // : +- '/path/to/jar'
+ // +- TOK_RESOURCE_URI
+ // :- TOK_FILE
+ // +- 'path/to/file'
+ val (funcNameArgs, otherArgs) = args.partition {
+ case Token("TOK_RESOURCE_LIST", _) => false
+ case Token("TOK_TEMPORARY", _) => false
+ case Token(_, Nil) => true
+ case _ => parseFailed("Invalid CREATE FUNCTION command", node)
+ }
+ // If database name is specified, there are 3 tokens, otherwise 2.
+ val (funcName, alias) = funcNameArgs match {
+ case Token(dbName, Nil) :: Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
+ (unquoteString(dbName) + "." + unquoteString(fname), unquoteString(aname))
+ case Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
+ (unquoteString(fname), unquoteString(aname))
+ case _ =>
+ parseFailed("Invalid CREATE FUNCTION command", node)
+ }
+ // Extract other keywords, if they exist
+ val Seq(rList, temp) = getClauses(Seq("TOK_RESOURCE_LIST", "TOK_TEMPORARY"), otherArgs)
+ val resourcesMap = rList.toSeq.flatMap {
+ case Token("TOK_RESOURCE_LIST", resources) =>
+ resources.map {
+ case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) =>
+ val resourceType = rType match {
+ case Token("TOK_JAR", Nil) => "jar"
+ case Token("TOK_FILE", Nil) => "file"
+ case Token("TOK_ARCHIVE", Nil) => "archive"
+ case Token(f, _) => parseFailed(s"Unexpected resource format '$f'", node)
+ }
+ (resourceType, unquoteString(rPath))
+ case _ => parseFailed("Invalid CREATE FUNCTION command", node)
+ }
+ case _ => parseFailed("Invalid CREATE FUNCTION command", node)
+ }.toMap
+ CreateFunction(funcName, alias, resourcesMap, temp.isDefined)(node.source)
+
+ case Token("TOK_ALTERTABLE", alterTableArgs) =>
+ AlterTableCommandParser.parse(node)
+
case Token("TOK_CREATETABLEUSING", createTableArgs) =>
val Seq(
temp,
- allowExisting,
+ ifNotExists,
Some(tabName),
tableCols,
Some(Token("TOK_TABLEPROVIDER", providerNameParts)),
@@ -79,30 +174,22 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
"TOK_TABLEPROVIDER",
"TOK_TABLEOPTIONS",
"TOK_QUERY"), createTableArgs)
-
val tableIdent: TableIdentifier = extractTableIdent(tabName)
-
val columns = tableCols.map {
case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField))
+ case _ => parseFailed("Invalid CREATE TABLE command", node)
}
-
val provider = providerNameParts.map {
case Token(name, Nil) => name
+ case _ => parseFailed("Invalid CREATE TABLE command", node)
}.mkString(".")
-
- val options: Map[String, String] = tableOpts.toSeq.flatMap {
- case Token("TOK_TABLEOPTIONS", options) =>
- options.map {
- case Token("TOK_TABLEOPTION", keysAndValue) =>
- val key = keysAndValue.init.map(_.text).mkString(".")
- val value = unquoteString(keysAndValue.last.text)
- (key, value)
- }
+ val options = tableOpts.toSeq.flatMap {
+ case Token("TOK_TABLEOPTIONS", opts) => extractProps(opts, "TOK_TABLEOPTION")
+ case _ => parseFailed("Invalid CREATE TABLE command", node)
}.toMap
+ val asClause = tableAs.map(nodeToPlan)
- val asClause = tableAs.map(nodeToPlan(_))
-
- if (temp.isDefined && allowExisting.isDefined) {
+ if (temp.isDefined && ifNotExists.isDefined) {
throw new AnalysisException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}
@@ -113,7 +200,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}
- val mode = if (allowExisting.isDefined) {
+ val mode = if (ifNotExists.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
@@ -136,7 +223,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
provider,
temp.isDefined,
options,
- allowExisting.isDefined,
+ ifNotExists.isDefined,
managedIfNoPath = false)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala
new file mode 100644
index 0000000000..58639275c1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, SortDirection}
+import org.apache.spark.sql.catalyst.parser._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * Helper object to parse alter table commands.
+ */
+object AlterTableCommandParser {
+ import ParserUtils._
+
+ /**
+ * Parse the given node assuming it is an alter table command.
+ */
+ def parse(node: ASTNode): LogicalPlan = {
+ node.children match {
+ case (tabName @ Token("TOK_TABNAME", _)) :: otherNodes =>
+ val tableIdent = extractTableIdent(tabName)
+ val partSpec = getClauseOption("TOK_PARTSPEC", node.children).map(parsePartitionSpec)
+ matchAlterTableCommands(node, otherNodes, tableIdent, partSpec)
+ case _ =>
+ parseFailed("Could not parse ALTER TABLE command", node)
+ }
+ }
+
+ private def cleanAndUnquoteString(s: String): String = {
+ cleanIdentifier(unquoteString(s))
+ }
+
+ /**
+ * Extract partition spec from the given [[ASTNode]] as a map, assuming it exists.
+ *
+ * Expected format:
+ * +- TOK_PARTSPEC
+ * :- TOK_PARTVAL
+ * : :- dt
+ * : +- '2008-08-08'
+ * +- TOK_PARTVAL
+ * :- country
+ * +- 'us'
+ */
+ private def parsePartitionSpec(node: ASTNode): Map[String, String] = {
+ node match {
+ case Token("TOK_PARTSPEC", partitions) =>
+ partitions.map {
+ // Note: sometimes there's a "=", "<" or ">" between the key and the value
+ case Token("TOK_PARTVAL", ident :: conj :: constant :: Nil) =>
+ (cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text))
+ case Token("TOK_PARTVAL", ident :: constant :: Nil) =>
+ (cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text))
+ case Token("TOK_PARTVAL", ident :: Nil) =>
+ (cleanAndUnquoteString(ident.text), null)
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }.toMap
+ case _ =>
+ parseFailed("Expected partition spec in ALTER TABLE command", node)
+ }
+ }
+
+ /**
+ * Extract table properties from the given [[ASTNode]] as a map, assuming it exists.
+ *
+ * Expected format:
+ * +- TOK_TABLEPROPERTIES
+ * +- TOK_TABLEPROPLIST
+ * :- TOK_TABLEPROPERTY
+ * : :- 'test'
+ * : +- 'value'
+ * +- TOK_TABLEPROPERTY
+ * :- 'comment'
+ * +- 'new_comment'
+ */
+ private def extractTableProps(node: ASTNode): Map[String, String] = {
+ node match {
+ case Token("TOK_TABLEPROPERTIES", propsList) =>
+ propsList.flatMap {
+ case Token("TOK_TABLEPROPLIST", props) =>
+ props.map { case Token("TOK_TABLEPROPERTY", key :: value :: Nil) =>
+ val k = cleanAndUnquoteString(key.text)
+ val v = value match {
+ case Token("TOK_NULL", Nil) => null
+ case _ => cleanAndUnquoteString(value.text)
+ }
+ (k, v)
+ }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }.toMap
+ case _ =>
+ parseFailed("Expected table properties in ALTER TABLE command", node)
+ }
+ }
+
+ /**
+ * Parse an alter table command from a [[ASTNode]] into a [[LogicalPlan]].
+ * This follows https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL.
+ *
+ * @param node the original [[ASTNode]] to parse.
+ * @param otherNodes the other [[ASTNode]]s after the first one containing the table name.
+ * @param tableIdent identifier of the table, parsed from the first [[ASTNode]].
+ * @param partition spec identifying the partition this command is concerned with, if any.
+ */
+ // TODO: This method is massive. Break it down.
+ private def matchAlterTableCommands(
+ node: ASTNode,
+ otherNodes: Seq[ASTNode],
+ tableIdent: TableIdentifier,
+ partition: Option[TablePartitionSpec]): LogicalPlan = {
+ otherNodes match {
+ // ALTER TABLE table_name RENAME TO new_table_name;
+ case Token("TOK_ALTERTABLE_RENAME", renameArgs) :: _ =>
+ val tableNameClause = getClause("TOK_TABNAME", renameArgs)
+ val newTableIdent = extractTableIdent(tableNameClause)
+ AlterTableRename(tableIdent, newTableIdent)(node.source)
+
+ // ALTER TABLE table_name SET TBLPROPERTIES ('comment' = new_comment);
+ case Token("TOK_ALTERTABLE_PROPERTIES", args) :: _ =>
+ val properties = extractTableProps(args.head)
+ AlterTableSetProperties(tableIdent, properties)(node.source)
+
+ // ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
+ case Token("TOK_ALTERTABLE_DROPPROPERTIES", args) :: _ =>
+ val properties = extractTableProps(args.head)
+ val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined
+ AlterTableUnsetProperties(tableIdent, properties, ifExists)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
+ case Token("TOK_ALTERTABLE_SERIALIZER", Token(serdeClassName, Nil) :: serdeArgs) :: _ =>
+ AlterTableSerDeProperties(
+ tableIdent,
+ Some(cleanAndUnquoteString(serdeClassName)),
+ serdeArgs.headOption.map(extractTableProps),
+ partition)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] SET SERDEPROPERTIES serde_properties;
+ case Token("TOK_ALTERTABLE_SERDEPROPERTIES", args) :: _ =>
+ AlterTableSerDeProperties(
+ tableIdent,
+ None,
+ Some(extractTableProps(args.head)),
+ partition)(node.source)
+
+ // ALTER TABLE table_name CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS;
+ case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_ALTERTABLE_BUCKETS", b) :: Nil) :: _ =>
+ val clusterCols: Seq[String] = b.head match {
+ case Token("TOK_TABCOLNAME", children) => children.map(_.text)
+ case _ => parseFailed("Invalid ALTER TABLE command", node)
+ }
+ // If sort columns are specified, num buckets should be the third arg.
+ // If sort columns are not specified, num buckets should be the second arg.
+ // TODO: actually use `sortDirections` once we actually store that in the metastore
+ val (sortCols: Seq[String], sortDirections: Seq[SortDirection], numBuckets: Int) = {
+ b.tail match {
+ case Token("TOK_TABCOLNAME", children) :: numBucketsNode :: Nil =>
+ val (cols, directions) = children.map {
+ case Token("TOK_TABSORTCOLNAMEASC", Token(col, Nil) :: Nil) => (col, Ascending)
+ case Token("TOK_TABSORTCOLNAMEDESC", Token(col, Nil) :: Nil) => (col, Descending)
+ }.unzip
+ (cols, directions, numBucketsNode.text.toInt)
+ case numBucketsNode :: Nil =>
+ (Nil, Nil, numBucketsNode.text.toInt)
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }
+ }
+ AlterTableStorageProperties(
+ tableIdent,
+ BucketSpec(numBuckets, clusterCols, sortCols))(node.source)
+
+ // ALTER TABLE table_name NOT CLUSTERED
+ case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_CLUSTERED", Nil) :: Nil) :: _ =>
+ AlterTableNotClustered(tableIdent)(node.source)
+
+ // ALTER TABLE table_name NOT SORTED
+ case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_SORTED", Nil) :: Nil) :: _ =>
+ AlterTableNotSorted(tableIdent)(node.source)
+
+ // ALTER TABLE table_name SKEWED BY (col1, col2)
+ // ON ((col1_value, col2_value) [, (col1_value, col2_value), ...])
+ // [STORED AS DIRECTORIES];
+ case Token("TOK_ALTERTABLE_SKEWED",
+ Token("TOK_TABLESKEWED",
+ Token("TOK_TABCOLNAME", colNames) :: colValues :: rest) :: Nil) :: _ =>
+ // Example format:
+ //
+ // +- TOK_ALTERTABLE_SKEWED
+ // :- TOK_TABLESKEWED
+ // : :- TOK_TABCOLNAME
+ // : : :- dt
+ // : : +- country
+ // :- TOK_TABCOLVALUE_PAIR
+ // : :- TOK_TABCOLVALUES
+ // : : :- TOK_TABCOLVALUE
+ // : : : :- '2008-08-08'
+ // : : : +- 'us'
+ // : :- TOK_TABCOLVALUES
+ // : : :- TOK_TABCOLVALUE
+ // : : : :- '2009-09-09'
+ // : : : +- 'uk'
+ // +- TOK_STOREASDIR
+ val names = colNames.map { n => cleanAndUnquoteString(n.text) }
+ val values = colValues match {
+ case Token("TOK_TABCOLVALUE", vals) =>
+ Seq(vals.map { n => cleanAndUnquoteString(n.text) })
+ case Token("TOK_TABCOLVALUE_PAIR", pairs) =>
+ pairs.map {
+ case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", vals) :: Nil) =>
+ vals.map { n => cleanAndUnquoteString(n.text) }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }
+ val storedAsDirs = rest match {
+ case Token("TOK_STOREDASDIRS", Nil) :: Nil => true
+ case _ => false
+ }
+ AlterTableSkewed(
+ tableIdent,
+ names,
+ values,
+ storedAsDirs)(node.source)
+
+ // ALTER TABLE table_name NOT SKEWED
+ case Token("TOK_ALTERTABLE_SKEWED", Nil) :: _ =>
+ AlterTableNotSkewed(tableIdent)(node.source)
+
+ // ALTER TABLE table_name NOT STORED AS DIRECTORIES
+ case Token("TOK_ALTERTABLE_SKEWED", Token("TOK_STOREDASDIRS", Nil) :: Nil) :: _ =>
+ AlterTableNotStoredAsDirs(tableIdent)(node.source)
+
+ // ALTER TABLE table_name SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] );
+ case Token("TOK_ALTERTABLE_SKEWED_LOCATION",
+ Token("TOK_SKEWED_LOCATIONS",
+ Token("TOK_SKEWED_LOCATION_LIST", locationMaps) :: Nil) :: Nil) :: _ =>
+ // Expected format:
+ //
+ // +- TOK_ALTERTABLE_SKEWED_LOCATION
+ // +- TOK_SKEWED_LOCATIONS
+ // +- TOK_SKEWED_LOCATION_LIST
+ // :- TOK_SKEWED_LOCATION_MAP
+ // : :- 'col1'
+ // : +- 'loc1'
+ // +- TOK_SKEWED_LOCATION_MAP
+ // :- TOK_TABCOLVALUES
+ // : +- TOK_TABCOLVALUE
+ // : :- 'col2'
+ // : +- 'col3'
+ // +- 'loc2'
+ val skewedMaps = locationMaps.flatMap {
+ case Token("TOK_SKEWED_LOCATION_MAP", col :: loc :: Nil) =>
+ col match {
+ case Token(const, Nil) =>
+ Seq((cleanAndUnquoteString(const), cleanAndUnquoteString(loc.text)))
+ case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", keys) :: Nil) =>
+ keys.map { k => (cleanAndUnquoteString(k.text), cleanAndUnquoteString(loc.text)) }
+ }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }.toMap
+ AlterTableSkewedLocation(tableIdent, skewedMaps)(node.source)
+
+ // ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
+ // spec [LOCATION 'loc2'] ...;
+ case Token("TOK_ALTERTABLE_ADDPARTS", args) :: _ =>
+ val (ifNotExists, parts) = args.head match {
+ case Token("TOK_IFNOTEXISTS", Nil) => (true, args.tail)
+ case _ => (false, args)
+ }
+ // List of (spec, location) to describe partitions to add
+ // Each partition spec may or may not be followed by a location
+ val parsedParts = new ArrayBuffer[(TablePartitionSpec, Option[String])]
+ parts.foreach {
+ case t @ Token("TOK_PARTSPEC", _) =>
+ parsedParts += ((parsePartitionSpec(t), None))
+ case Token("TOK_PARTITIONLOCATION", loc :: Nil) =>
+ // Update the location of the last partition we just added
+ if (parsedParts.nonEmpty) {
+ val (spec, _) = parsedParts.remove(parsedParts.length - 1)
+ parsedParts += ((spec, Some(unquoteString(loc.text))))
+ }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }
+ AlterTableAddPartition(tableIdent, parsedParts, ifNotExists)(node.source)
+
+ // ALTER TABLE table_name PARTITION spec1 RENAME TO PARTITION spec2;
+ case Token("TOK_ALTERTABLE_RENAMEPART", spec :: Nil) :: _ =>
+ val newPartition = parsePartitionSpec(spec)
+ val oldPartition = partition.getOrElse {
+ parseFailed("Expected old partition spec in ALTER TABLE rename partition command", node)
+ }
+ AlterTableRenamePartition(tableIdent, oldPartition, newPartition)(node.source)
+
+ // ALTER TABLE table_name_1 EXCHANGE PARTITION spec WITH TABLE table_name_2;
+ case Token("TOK_ALTERTABLE_EXCHANGEPARTITION", spec :: newTable :: Nil) :: _ =>
+ val parsedSpec = parsePartitionSpec(spec)
+ val newTableIdent = extractTableIdent(newTable)
+ AlterTableExchangePartition(tableIdent, newTableIdent, parsedSpec)(node.source)
+
+ // ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
+ case Token("TOK_ALTERTABLE_DROPPARTS", args) :: _ =>
+ val parts = args.collect { case p @ Token("TOK_PARTSPEC", _) => parsePartitionSpec(p) }
+ val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined
+ val purge = getClauseOption("PURGE", args).isDefined
+ AlterTableDropPartition(tableIdent, parts, ifExists, purge)(node.source)
+
+ // ALTER TABLE table_name ARCHIVE PARTITION spec;
+ case Token("TOK_ALTERTABLE_ARCHIVE", spec :: Nil) :: _ =>
+ AlterTableArchivePartition(tableIdent, parsePartitionSpec(spec))(node.source)
+
+ // ALTER TABLE table_name UNARCHIVE PARTITION spec;
+ case Token("TOK_ALTERTABLE_UNARCHIVE", spec :: Nil) :: _ =>
+ AlterTableUnarchivePartition(tableIdent, parsePartitionSpec(spec))(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] SET FILEFORMAT file_format;
+ case Token("TOK_ALTERTABLE_FILEFORMAT", args) :: _ =>
+ val Seq(fileFormat, genericFormat) =
+ getClauses(Seq("TOK_TABLEFILEFORMAT", "TOK_FILEFORMAT_GENERIC"), args)
+ // Note: the AST doesn't contain information about which file format is being set here.
+ // E.g. we can't differentiate between INPUTFORMAT and OUTPUTFORMAT if either is set.
+ // Right now this just stores the values, but we should figure out how to get the keys.
+ val fFormat = fileFormat
+ .map { _.children.map { n => cleanAndUnquoteString(n.text) }}
+ .getOrElse(Seq())
+ val gFormat = genericFormat.map { f => cleanAndUnquoteString(f.children(0).text) }
+ AlterTableSetFileFormat(tableIdent, partition, fFormat, gFormat)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] SET LOCATION "loc";
+ case Token("TOK_ALTERTABLE_LOCATION", Token(loc, Nil) :: Nil) :: _ =>
+ AlterTableSetLocation(tableIdent, partition, cleanAndUnquoteString(loc))(node.source)
+
+ // ALTER TABLE table_name TOUCH [PARTITION spec];
+ case Token("TOK_ALTERTABLE_TOUCH", args) :: _ =>
+ // Note: the partition spec, if it exists, comes after TOUCH, so `partition` should
+ // always be None here. Instead, we need to parse it from the TOUCH node's children.
+ val part = getClauseOption("TOK_PARTSPEC", args).map(parsePartitionSpec)
+ AlterTableTouch(tableIdent, part)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] COMPACT 'compaction_type';
+ case Token("TOK_ALTERTABLE_COMPACT", Token(compactType, Nil) :: Nil) :: _ =>
+ AlterTableCompact(tableIdent, partition, cleanAndUnquoteString(compactType))(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] CONCATENATE;
+ case Token("TOK_ALTERTABLE_MERGEFILES", _) :: _ =>
+ AlterTableMerge(tableIdent, partition)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] CHANGE [COLUMN] col_old_name col_new_name
+ // column_type [COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT];
+ case Token("TOK_ALTERTABLE_RENAMECOL", oldName :: newName :: dataType :: args) :: _ =>
+ val afterColName: Option[String] =
+ getClauseOption("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", args).map { ap =>
+ ap.children match {
+ case Token(col, Nil) :: Nil => col
+ case _ => parseFailed("Invalid ALTER TABLE command", node)
+ }
+ }
+ val restrict = getClauseOption("TOK_RESTRICT", args).isDefined
+ val cascade = getClauseOption("TOK_CASCADE", args).isDefined
+ val comment = args.headOption.map {
+ case Token("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", _) => null
+ case Token("TOK_RESTRICT", _) => null
+ case Token("TOK_CASCADE", _) => null
+ case Token(commentStr, Nil) => cleanAndUnquoteString(commentStr)
+ case _ => parseFailed("Invalid ALTER TABLE command", node)
+ }
+ AlterTableChangeCol(
+ tableIdent,
+ partition,
+ oldName.text,
+ newName.text,
+ nodeToDataType(dataType),
+ comment,
+ afterColName,
+ restrict,
+ cascade)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] ADD COLUMNS (name type [COMMENT comment], ...)
+ // [CASCADE|RESTRICT]
+ case Token("TOK_ALTERTABLE_ADDCOLS", args) :: _ =>
+ val columnNodes = getClause("TOK_TABCOLLIST", args).children
+ val columns = StructType(columnNodes.map(nodeToStructField))
+ val restrict = getClauseOption("TOK_RESTRICT", args).isDefined
+ val cascade = getClauseOption("TOK_CASCADE", args).isDefined
+ AlterTableAddCol(tableIdent, partition, columns, restrict, cascade)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] REPLACE COLUMNS (name type [COMMENT comment], ...)
+ // [CASCADE|RESTRICT]
+ case Token("TOK_ALTERTABLE_REPLACECOLS", args) :: _ =>
+ val columnNodes = getClause("TOK_TABCOLLIST", args).children
+ val columns = StructType(columnNodes.map(nodeToStructField))
+ val restrict = getClauseOption("TOK_RESTRICT", args).isDefined
+ val cascade = getClauseOption("TOK_CASCADE", args).isDefined
+ AlterTableReplaceCol(tableIdent, partition, columns, restrict, cascade)(node.source)
+
+ case _ =>
+ parseFailed("Unsupported ALTER TABLE command", node)
+ }
+ }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
new file mode 100644
index 0000000000..9df58d214a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.types._
+
+
+// Note: The definition of these commands are based on the ones described in
+// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
+
+/**
+ * A DDL command expected to be parsed and run in an underlying system instead of in Spark.
+ */
+abstract class NativeDDLCommand(val sql: String) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.runNativeSql(sql)
+ }
+
+ override val output: Seq[Attribute] = {
+ Seq(AttributeReference("result", StringType, nullable = false)())
+ }
+
+}
+
+case class CreateDatabase(
+ databaseName: String,
+ ifNotExists: Boolean,
+ path: Option[String],
+ comment: Option[String],
+ props: Map[String, String])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class CreateFunction(
+ functionName: String,
+ alias: String,
+ resourcesMap: Map[String, String],
+ isTemp: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableRename(
+ oldName: TableIdentifier,
+ newName: TableIdentifier)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSetProperties(
+ tableName: TableIdentifier,
+ properties: Map[String, String])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableUnsetProperties(
+ tableName: TableIdentifier,
+ properties: Map[String, String],
+ ifExists: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSerDeProperties(
+ tableName: TableIdentifier,
+ serdeClassName: Option[String],
+ serdeProperties: Option[Map[String, String]],
+ partition: Option[Map[String, String]])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableStorageProperties(
+ tableName: TableIdentifier,
+ buckets: BucketSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableNotClustered(
+ tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableNotSorted(
+ tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSkewed(
+ tableName: TableIdentifier,
+ // e.g. (dt, country)
+ skewedCols: Seq[String],
+ // e.g. ('2008-08-08', 'us), ('2009-09-09', 'uk')
+ skewedValues: Seq[Seq[String]],
+ storedAsDirs: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging {
+
+ require(skewedValues.forall(_.size == skewedCols.size),
+ "number of columns in skewed values do not match number of skewed columns provided")
+}
+
+case class AlterTableNotSkewed(
+ tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableNotStoredAsDirs(
+ tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSkewedLocation(
+ tableName: TableIdentifier,
+ skewedMap: Map[String, String])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableAddPartition(
+ tableName: TableIdentifier,
+ partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
+ ifNotExists: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableRenamePartition(
+ tableName: TableIdentifier,
+ oldPartition: TablePartitionSpec,
+ newPartition: TablePartitionSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableExchangePartition(
+ fromTableName: TableIdentifier,
+ toTableName: TableIdentifier,
+ spec: TablePartitionSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableDropPartition(
+ tableName: TableIdentifier,
+ specs: Seq[TablePartitionSpec],
+ ifExists: Boolean,
+ purge: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableArchivePartition(
+ tableName: TableIdentifier,
+ spec: TablePartitionSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableUnarchivePartition(
+ tableName: TableIdentifier,
+ spec: TablePartitionSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSetFileFormat(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ fileFormat: Seq[String],
+ genericFormat: Option[String])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSetLocation(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ location: String)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableTouch(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableCompact(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ compactType: String)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableMerge(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableChangeCol(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ oldColName: String,
+ newColName: String,
+ dataType: DataType,
+ comment: Option[String],
+ afterColName: Option[String],
+ restrict: Boolean,
+ cascade: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableAddCol(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ columns: StructType,
+ restrict: Boolean,
+ cascade: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableReplaceCol(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ columns: StructType,
+ restrict: Boolean,
+ cascade: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
new file mode 100644
index 0000000000..0d632a8a13
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.execution.SparkQl
+import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.types._
+
+class DDLCommandSuite extends PlanTest {
+ private val parser = new SparkQl
+
+ test("create database") {
+ val sql =
+ """
+ |CREATE DATABASE IF NOT EXISTS database_name
+ |COMMENT 'database_comment' LOCATION '/home/user/db'
+ |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
+ """.stripMargin
+ val parsed = parser.parsePlan(sql)
+ val expected = CreateDatabase(
+ "database_name",
+ ifNotExists = true,
+ Some("/home/user/db"),
+ Some("database_comment"),
+ Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("create function") {
+ val sql1 =
+ """
+ |CREATE TEMPORARY FUNCTION helloworld as
+ |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar',
+ |FILE 'path/to/file'
+ """.stripMargin
+ val sql2 =
+ """
+ |CREATE FUNCTION hello.world as
+ |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
+ |FILE 'path/to/file'
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val expected1 = CreateFunction(
+ "helloworld",
+ "com.matthewrathbone.example.SimpleUDFExample",
+ Map("jar" -> "/path/to/jar", "file" -> "path/to/file"),
+ isTemp = true)(sql1)
+ val expected2 = CreateFunction(
+ "hello.world",
+ "com.matthewrathbone.example.SimpleUDFExample",
+ Map("archive" -> "/path/to/archive", "file" -> "path/to/file"),
+ isTemp = false)(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: rename table") {
+ val sql = "ALTER TABLE table_name RENAME TO new_table_name"
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableRename(
+ TableIdentifier("table_name", None),
+ TableIdentifier("new_table_name", None))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: alter table properties") {
+ val sql1 = "ALTER TABLE table_name SET TBLPROPERTIES ('test' = 'test', " +
+ "'comment' = 'new_comment')"
+ val sql2 = "ALTER TABLE table_name UNSET TBLPROPERTIES ('comment', 'test')"
+ val sql3 = "ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'test')"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSetProperties(
+ tableIdent, Map("test" -> "test", "comment" -> "new_comment"))(sql1)
+ val expected2 = AlterTableUnsetProperties(
+ tableIdent, Map("comment" -> null, "test" -> null), ifExists = false)(sql2)
+ val expected3 = AlterTableUnsetProperties(
+ tableIdent, Map("comment" -> null, "test" -> null), ifExists = true)(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ }
+
+ test("alter table: SerDe properties") {
+ val sql1 = "ALTER TABLE table_name SET SERDE 'org.apache.class'"
+ val sql2 =
+ """
+ |ALTER TABLE table_name SET SERDE 'org.apache.class'
+ |WITH SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',')
+ """.stripMargin
+ val sql3 =
+ """
+ |ALTER TABLE table_name SET SERDEPROPERTIES ('columns'='foo,bar',
+ |'field.delim' = ',')
+ """.stripMargin
+ val sql4 =
+ """
+ |ALTER TABLE table_name PARTITION (test, dt='2008-08-08',
+ |country='us') SET SERDE 'org.apache.class' WITH SERDEPROPERTIES ('columns'='foo,bar',
+ |'field.delim' = ',')
+ """.stripMargin
+ val sql5 =
+ """
+ |ALTER TABLE table_name PARTITION (test, dt='2008-08-08',
+ |country='us') SET SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',')
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val parsed4 = parser.parsePlan(sql4)
+ val parsed5 = parser.parsePlan(sql5)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSerDeProperties(
+ tableIdent, Some("org.apache.class"), None, None)(sql1)
+ val expected2 = AlterTableSerDeProperties(
+ tableIdent,
+ Some("org.apache.class"),
+ Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
+ None)(sql2)
+ val expected3 = AlterTableSerDeProperties(
+ tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)(sql3)
+ val expected4 = AlterTableSerDeProperties(
+ tableIdent,
+ Some("org.apache.class"),
+ Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
+ Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql4)
+ val expected5 = AlterTableSerDeProperties(
+ tableIdent,
+ None,
+ Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
+ Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql5)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ comparePlans(parsed4, expected4)
+ comparePlans(parsed5, expected5)
+ }
+
+ test("alter table: storage properties") {
+ val sql1 = "ALTER TABLE table_name CLUSTERED BY (dt, country) INTO 10 BUCKETS"
+ val sql2 = "ALTER TABLE table_name CLUSTERED BY (dt, country) SORTED BY " +
+ "(dt, country DESC) INTO 10 BUCKETS"
+ val sql3 = "ALTER TABLE table_name NOT CLUSTERED"
+ val sql4 = "ALTER TABLE table_name NOT SORTED"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val parsed4 = parser.parsePlan(sql4)
+ val tableIdent = TableIdentifier("table_name", None)
+ val cols = List("dt", "country")
+ // TODO: also test the sort directions once we keep track of that
+ val expected1 = AlterTableStorageProperties(
+ tableIdent, BucketSpec(10, cols, Nil))(sql1)
+ val expected2 = AlterTableStorageProperties(
+ tableIdent, BucketSpec(10, cols, cols))(sql2)
+ val expected3 = AlterTableNotClustered(tableIdent)(sql3)
+ val expected4 = AlterTableNotSorted(tableIdent)(sql4)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ comparePlans(parsed4, expected4)
+ }
+
+ test("alter table: skewed") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name SKEWED BY (dt, country) ON
+ |(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn')) STORED AS DIRECTORIES
+ """.stripMargin
+ val sql2 =
+ """
+ |ALTER TABLE table_name SKEWED BY (dt, country) ON
+ |('2008-08-08', 'us') STORED AS DIRECTORIES
+ """.stripMargin
+ val sql3 =
+ """
+ |ALTER TABLE table_name SKEWED BY (dt, country) ON
+ |(('2008-08-08', 'us'), ('2009-09-09', 'uk'))
+ """.stripMargin
+ val sql4 = "ALTER TABLE table_name NOT SKEWED"
+ val sql5 = "ALTER TABLE table_name NOT STORED AS DIRECTORIES"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val parsed4 = parser.parsePlan(sql4)
+ val parsed5 = parser.parsePlan(sql5)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSkewed(
+ tableIdent,
+ Seq("dt", "country"),
+ Seq(List("2008-08-08", "us"), List("2009-09-09", "uk"), List("2010-10-10", "cn")),
+ storedAsDirs = true)(sql1)
+ val expected2 = AlterTableSkewed(
+ tableIdent,
+ Seq("dt", "country"),
+ Seq(List("2008-08-08", "us")),
+ storedAsDirs = true)(sql2)
+ val expected3 = AlterTableSkewed(
+ tableIdent,
+ Seq("dt", "country"),
+ Seq(List("2008-08-08", "us"), List("2009-09-09", "uk")),
+ storedAsDirs = false)(sql3)
+ val expected4 = AlterTableNotSkewed(tableIdent)(sql4)
+ val expected5 = AlterTableNotStoredAsDirs(tableIdent)(sql5)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ comparePlans(parsed4, expected4)
+ comparePlans(parsed5, expected5)
+ }
+
+ test("alter table: skewed location") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name SET SKEWED LOCATION
+ |('123'='location1', 'test'='location2')
+ """.stripMargin
+ val sql2 =
+ """
+ |ALTER TABLE table_name SET SKEWED LOCATION
+ |(('2008-08-08', 'us')='location1', 'test'='location2')
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSkewedLocation(
+ tableIdent,
+ Map("123" -> "location1", "test" -> "location2"))(sql1)
+ val expected2 = AlterTableSkewedLocation(
+ tableIdent,
+ Map("2008-08-08" -> "location1", "us" -> "location1", "test" -> "location2"))(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: add partition") {
+ val sql =
+ """
+ |ALTER TABLE table_name ADD IF NOT EXISTS PARTITION
+ |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION
+ |(dt='2009-09-09', country='uk')
+ """.stripMargin
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableAddPartition(
+ TableIdentifier("table_name", None),
+ Seq(
+ (Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
+ (Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
+ ifNotExists = true)(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: rename partition") {
+ val sql =
+ """
+ |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
+ |RENAME TO PARTITION (dt='2008-09-09', country='uk')
+ """.stripMargin
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableRenamePartition(
+ TableIdentifier("table_name", None),
+ Map("dt" -> "2008-08-08", "country" -> "us"),
+ Map("dt" -> "2008-09-09", "country" -> "uk"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: exchange partition") {
+ val sql =
+ """
+ |ALTER TABLE table_name_1 EXCHANGE PARTITION
+ |(dt='2008-08-08', country='us') WITH TABLE table_name_2
+ """.stripMargin
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableExchangePartition(
+ TableIdentifier("table_name_1", None),
+ TableIdentifier("table_name_2", None),
+ Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: drop partitions") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name DROP IF EXISTS PARTITION
+ |(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
+ """.stripMargin
+ val sql2 =
+ """
+ |ALTER TABLE table_name DROP PARTITION
+ |(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk') PURGE
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableDropPartition(
+ tableIdent,
+ Seq(
+ Map("dt" -> "2008-08-08", "country" -> "us"),
+ Map("dt" -> "2009-09-09", "country" -> "uk")),
+ ifExists = true,
+ purge = false)(sql1)
+ val expected2 = AlterTableDropPartition(
+ tableIdent,
+ Seq(
+ Map("dt" -> "2008-08-08", "country" -> "us"),
+ Map("dt" -> "2009-09-09", "country" -> "uk")),
+ ifExists = false,
+ purge = true)(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: archive partition") {
+ val sql = "ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')"
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableArchivePartition(
+ TableIdentifier("table_name", None),
+ Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: unarchive partition") {
+ val sql = "ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')"
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableUnarchivePartition(
+ TableIdentifier("table_name", None),
+ Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: set file format") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test'
+ |OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test'
+ """.stripMargin
+ val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
+ "OUTPUTFORMAT 'test' SERDE 'test'"
+ val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
+ "SET FILEFORMAT PARQUET"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSetFileFormat(
+ tableIdent,
+ None,
+ List("test", "test", "test", "test", "test"),
+ None)(sql1)
+ val expected2 = AlterTableSetFileFormat(
+ tableIdent,
+ None,
+ List("test", "test", "test"),
+ None)(sql2)
+ val expected3 = AlterTableSetFileFormat(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")),
+ Seq(),
+ Some("PARQUET"))(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ }
+
+ test("alter table: set location") {
+ val sql1 = "ALTER TABLE table_name SET LOCATION 'new location'"
+ val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
+ "SET LOCATION 'new location'"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSetLocation(
+ tableIdent,
+ None,
+ "new location")(sql1)
+ val expected2 = AlterTableSetLocation(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")),
+ "new location")(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: touch") {
+ val sql1 = "ALTER TABLE table_name TOUCH"
+ val sql2 = "ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableTouch(
+ tableIdent,
+ None)(sql1)
+ val expected2 = AlterTableTouch(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: compact") {
+ val sql1 = "ALTER TABLE table_name COMPACT 'compaction_type'"
+ val sql2 =
+ """
+ |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
+ |COMPACT 'MAJOR'
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableCompact(
+ tableIdent,
+ None,
+ "compaction_type")(sql1)
+ val expected2 = AlterTableCompact(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")),
+ "MAJOR")(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: concatenate") {
+ val sql1 = "ALTER TABLE table_name CONCATENATE"
+ val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableMerge(tableIdent, None)(sql1)
+ val expected2 = AlterTableMerge(
+ tableIdent, Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: change column name/type/position/comment") {
+ val sql1 = "ALTER TABLE table_name CHANGE col_old_name col_new_name INT"
+ val sql2 =
+ """
+ |ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT
+ |COMMENT 'col_comment' FIRST CASCADE
+ """.stripMargin
+ val sql3 =
+ """
+ |ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT
+ |COMMENT 'col_comment' AFTER column_name RESTRICT
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableChangeCol(
+ tableName = tableIdent,
+ partitionSpec = None,
+ oldColName = "col_old_name",
+ newColName = "col_new_name",
+ dataType = IntegerType,
+ comment = None,
+ afterColName = None,
+ restrict = false,
+ cascade = false)(sql1)
+ val expected2 = AlterTableChangeCol(
+ tableName = tableIdent,
+ partitionSpec = None,
+ oldColName = "col_old_name",
+ newColName = "col_new_name",
+ dataType = IntegerType,
+ comment = Some("col_comment"),
+ afterColName = None,
+ restrict = false,
+ cascade = true)(sql2)
+ val expected3 = AlterTableChangeCol(
+ tableName = tableIdent,
+ partitionSpec = None,
+ oldColName = "col_old_name",
+ newColName = "col_new_name",
+ dataType = IntegerType,
+ comment = Some("col_comment"),
+ afterColName = Some("column_name"),
+ restrict = true,
+ cascade = false)(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ }
+
+ test("alter table: add/replace columns") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
+ |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
+ |COMMENT 'test_comment2') CASCADE
+ """.stripMargin
+ val sql2 =
+ """
+ |ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT
+ |COMMENT 'test_comment', new_col2 LONG COMMENT 'test_comment2') RESTRICT
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val meta1 = new MetadataBuilder().putString("comment", "test_comment").build()
+ val meta2 = new MetadataBuilder().putString("comment", "test_comment2").build()
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableAddCol(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")),
+ StructType(Seq(
+ StructField("new_col1", IntegerType, nullable = true, meta1),
+ StructField("new_col2", LongType, nullable = true, meta2))),
+ restrict = false,
+ cascade = true)(sql1)
+ val expected2 = AlterTableReplaceCol(
+ tableIdent,
+ None,
+ StructType(Seq(
+ StructField("new_col1", IntegerType, nullable = true, meta1),
+ StructField("new_col2", LongType, nullable = true, meta2))),
+ restrict = true,
+ cascade = false)(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+}