aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala125
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala428
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala210
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala544
5 files changed, 1297 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 49a70a7c5f..36fe57f78b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -785,6 +785,15 @@ class SQLContext private[sql](
}
/**
+ * Executes a SQL query without parsing it, but instead passing it directly to an underlying
+ * system to process. This is currently only used for Hive DDLs and will be removed as soon
+ * as Spark can parse all supported Hive DDLs itself.
+ */
+ private[sql] def runNativeSql(sqlText: String): Seq[Row] = {
+ throw new UnsupportedOperationException
+ }
+
+ /**
* Returns the specified table as a [[DataFrame]].
*
* @group ddl_ops
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
index 471a5e436c..d12dab567b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
@@ -29,7 +29,26 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
import ParserUtils._
/** Check if a command should not be explained. */
- protected def isNoExplainCommand(command: String): Boolean = "TOK_DESCTABLE" == command
+ protected def isNoExplainCommand(command: String): Boolean = {
+ "TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command
+ }
+
+ /**
+ * For each node, extract properties in the form of a list ['key1', 'key2', 'key3', 'value']
+ * into a pair (key1.key2.key3, value).
+ */
+ private def extractProps(
+ props: Seq[ASTNode],
+ expectedNodeText: String): Seq[(String, String)] = {
+ props.map {
+ case Token(x, keysAndValue) if x == expectedNodeText =>
+ val key = keysAndValue.init.map { x => unquoteString(x.text) }.mkString(".")
+ val value = unquoteString(keysAndValue.last.text)
+ (key, value)
+ case p =>
+ parseFailed(s"Expected property '$expectedNodeText' in command", p)
+ }
+ }
protected override def nodeToPlan(node: ASTNode): LogicalPlan = {
node match {
@@ -64,10 +83,86 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
val tableIdent = extractTableIdent(nameParts)
RefreshTable(tableIdent)
+ // CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
+ // [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)];
+ case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) =>
+ val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq(
+ "TOK_IFNOTEXISTS",
+ "TOK_DATABASELOCATION",
+ "TOK_DATABASECOMMENT",
+ "TOK_DATABASEPROPERTIES"), args)
+ val location = dbLocation.map {
+ case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc)
+ case _ => parseFailed("Invalid CREATE DATABASE command", node)
+ }
+ val comment = databaseComment.map {
+ case Token("TOK_DATABASECOMMENT", Token(com, Nil) :: Nil) => unquoteString(com)
+ case _ => parseFailed("Invalid CREATE DATABASE command", node)
+ }
+ val props = dbprops.toSeq.flatMap {
+ case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) =>
+ extractProps(propList, "TOK_TABLEPROPERTY")
+ case _ => parseFailed("Invalid CREATE DATABASE command", node)
+ }.toMap
+ CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source)
+
+ // CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
+ // [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
+ case Token("TOK_CREATEFUNCTION", args) =>
+ // Example format:
+ //
+ // TOK_CREATEFUNCTION
+ // :- db_name
+ // :- func_name
+ // :- alias
+ // +- TOK_RESOURCE_LIST
+ // :- TOK_RESOURCE_URI
+ // : :- TOK_JAR
+ // : +- '/path/to/jar'
+ // +- TOK_RESOURCE_URI
+ // :- TOK_FILE
+ // +- 'path/to/file'
+ val (funcNameArgs, otherArgs) = args.partition {
+ case Token("TOK_RESOURCE_LIST", _) => false
+ case Token("TOK_TEMPORARY", _) => false
+ case Token(_, Nil) => true
+ case _ => parseFailed("Invalid CREATE FUNCTION command", node)
+ }
+ // If database name is specified, there are 3 tokens, otherwise 2.
+ val (funcName, alias) = funcNameArgs match {
+ case Token(dbName, Nil) :: Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
+ (unquoteString(dbName) + "." + unquoteString(fname), unquoteString(aname))
+ case Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
+ (unquoteString(fname), unquoteString(aname))
+ case _ =>
+ parseFailed("Invalid CREATE FUNCTION command", node)
+ }
+ // Extract other keywords, if they exist
+ val Seq(rList, temp) = getClauses(Seq("TOK_RESOURCE_LIST", "TOK_TEMPORARY"), otherArgs)
+ val resourcesMap = rList.toSeq.flatMap {
+ case Token("TOK_RESOURCE_LIST", resources) =>
+ resources.map {
+ case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) =>
+ val resourceType = rType match {
+ case Token("TOK_JAR", Nil) => "jar"
+ case Token("TOK_FILE", Nil) => "file"
+ case Token("TOK_ARCHIVE", Nil) => "archive"
+ case Token(f, _) => parseFailed(s"Unexpected resource format '$f'", node)
+ }
+ (resourceType, unquoteString(rPath))
+ case _ => parseFailed("Invalid CREATE FUNCTION command", node)
+ }
+ case _ => parseFailed("Invalid CREATE FUNCTION command", node)
+ }.toMap
+ CreateFunction(funcName, alias, resourcesMap, temp.isDefined)(node.source)
+
+ case Token("TOK_ALTERTABLE", alterTableArgs) =>
+ AlterTableCommandParser.parse(node)
+
case Token("TOK_CREATETABLEUSING", createTableArgs) =>
val Seq(
temp,
- allowExisting,
+ ifNotExists,
Some(tabName),
tableCols,
Some(Token("TOK_TABLEPROVIDER", providerNameParts)),
@@ -79,30 +174,22 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
"TOK_TABLEPROVIDER",
"TOK_TABLEOPTIONS",
"TOK_QUERY"), createTableArgs)
-
val tableIdent: TableIdentifier = extractTableIdent(tabName)
-
val columns = tableCols.map {
case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField))
+ case _ => parseFailed("Invalid CREATE TABLE command", node)
}
-
val provider = providerNameParts.map {
case Token(name, Nil) => name
+ case _ => parseFailed("Invalid CREATE TABLE command", node)
}.mkString(".")
-
- val options: Map[String, String] = tableOpts.toSeq.flatMap {
- case Token("TOK_TABLEOPTIONS", options) =>
- options.map {
- case Token("TOK_TABLEOPTION", keysAndValue) =>
- val key = keysAndValue.init.map(_.text).mkString(".")
- val value = unquoteString(keysAndValue.last.text)
- (key, value)
- }
+ val options = tableOpts.toSeq.flatMap {
+ case Token("TOK_TABLEOPTIONS", opts) => extractProps(opts, "TOK_TABLEOPTION")
+ case _ => parseFailed("Invalid CREATE TABLE command", node)
}.toMap
+ val asClause = tableAs.map(nodeToPlan)
- val asClause = tableAs.map(nodeToPlan(_))
-
- if (temp.isDefined && allowExisting.isDefined) {
+ if (temp.isDefined && ifNotExists.isDefined) {
throw new AnalysisException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}
@@ -113,7 +200,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}
- val mode = if (allowExisting.isDefined) {
+ val mode = if (ifNotExists.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
@@ -136,7 +223,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
provider,
temp.isDefined,
options,
- allowExisting.isDefined,
+ ifNotExists.isDefined,
managedIfNoPath = false)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala
new file mode 100644
index 0000000000..58639275c1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, SortDirection}
+import org.apache.spark.sql.catalyst.parser._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * Helper object to parse alter table commands.
+ */
+object AlterTableCommandParser {
+ import ParserUtils._
+
+ /**
+ * Parse the given node assuming it is an alter table command.
+ */
+ def parse(node: ASTNode): LogicalPlan = {
+ node.children match {
+ case (tabName @ Token("TOK_TABNAME", _)) :: otherNodes =>
+ val tableIdent = extractTableIdent(tabName)
+ val partSpec = getClauseOption("TOK_PARTSPEC", node.children).map(parsePartitionSpec)
+ matchAlterTableCommands(node, otherNodes, tableIdent, partSpec)
+ case _ =>
+ parseFailed("Could not parse ALTER TABLE command", node)
+ }
+ }
+
+ private def cleanAndUnquoteString(s: String): String = {
+ cleanIdentifier(unquoteString(s))
+ }
+
+ /**
+ * Extract partition spec from the given [[ASTNode]] as a map, assuming it exists.
+ *
+ * Expected format:
+ * +- TOK_PARTSPEC
+ * :- TOK_PARTVAL
+ * : :- dt
+ * : +- '2008-08-08'
+ * +- TOK_PARTVAL
+ * :- country
+ * +- 'us'
+ */
+ private def parsePartitionSpec(node: ASTNode): Map[String, String] = {
+ node match {
+ case Token("TOK_PARTSPEC", partitions) =>
+ partitions.map {
+ // Note: sometimes there's a "=", "<" or ">" between the key and the value
+ case Token("TOK_PARTVAL", ident :: conj :: constant :: Nil) =>
+ (cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text))
+ case Token("TOK_PARTVAL", ident :: constant :: Nil) =>
+ (cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text))
+ case Token("TOK_PARTVAL", ident :: Nil) =>
+ (cleanAndUnquoteString(ident.text), null)
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }.toMap
+ case _ =>
+ parseFailed("Expected partition spec in ALTER TABLE command", node)
+ }
+ }
+
+ /**
+ * Extract table properties from the given [[ASTNode]] as a map, assuming it exists.
+ *
+ * Expected format:
+ * +- TOK_TABLEPROPERTIES
+ * +- TOK_TABLEPROPLIST
+ * :- TOK_TABLEPROPERTY
+ * : :- 'test'
+ * : +- 'value'
+ * +- TOK_TABLEPROPERTY
+ * :- 'comment'
+ * +- 'new_comment'
+ */
+ private def extractTableProps(node: ASTNode): Map[String, String] = {
+ node match {
+ case Token("TOK_TABLEPROPERTIES", propsList) =>
+ propsList.flatMap {
+ case Token("TOK_TABLEPROPLIST", props) =>
+ props.map { case Token("TOK_TABLEPROPERTY", key :: value :: Nil) =>
+ val k = cleanAndUnquoteString(key.text)
+ val v = value match {
+ case Token("TOK_NULL", Nil) => null
+ case _ => cleanAndUnquoteString(value.text)
+ }
+ (k, v)
+ }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }.toMap
+ case _ =>
+ parseFailed("Expected table properties in ALTER TABLE command", node)
+ }
+ }
+
+ /**
+ * Parse an alter table command from a [[ASTNode]] into a [[LogicalPlan]].
+ * This follows https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL.
+ *
+ * @param node the original [[ASTNode]] to parse.
+ * @param otherNodes the other [[ASTNode]]s after the first one containing the table name.
+ * @param tableIdent identifier of the table, parsed from the first [[ASTNode]].
+ * @param partition spec identifying the partition this command is concerned with, if any.
+ */
+ // TODO: This method is massive. Break it down.
+ private def matchAlterTableCommands(
+ node: ASTNode,
+ otherNodes: Seq[ASTNode],
+ tableIdent: TableIdentifier,
+ partition: Option[TablePartitionSpec]): LogicalPlan = {
+ otherNodes match {
+ // ALTER TABLE table_name RENAME TO new_table_name;
+ case Token("TOK_ALTERTABLE_RENAME", renameArgs) :: _ =>
+ val tableNameClause = getClause("TOK_TABNAME", renameArgs)
+ val newTableIdent = extractTableIdent(tableNameClause)
+ AlterTableRename(tableIdent, newTableIdent)(node.source)
+
+ // ALTER TABLE table_name SET TBLPROPERTIES ('comment' = new_comment);
+ case Token("TOK_ALTERTABLE_PROPERTIES", args) :: _ =>
+ val properties = extractTableProps(args.head)
+ AlterTableSetProperties(tableIdent, properties)(node.source)
+
+ // ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
+ case Token("TOK_ALTERTABLE_DROPPROPERTIES", args) :: _ =>
+ val properties = extractTableProps(args.head)
+ val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined
+ AlterTableUnsetProperties(tableIdent, properties, ifExists)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
+ case Token("TOK_ALTERTABLE_SERIALIZER", Token(serdeClassName, Nil) :: serdeArgs) :: _ =>
+ AlterTableSerDeProperties(
+ tableIdent,
+ Some(cleanAndUnquoteString(serdeClassName)),
+ serdeArgs.headOption.map(extractTableProps),
+ partition)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] SET SERDEPROPERTIES serde_properties;
+ case Token("TOK_ALTERTABLE_SERDEPROPERTIES", args) :: _ =>
+ AlterTableSerDeProperties(
+ tableIdent,
+ None,
+ Some(extractTableProps(args.head)),
+ partition)(node.source)
+
+ // ALTER TABLE table_name CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS;
+ case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_ALTERTABLE_BUCKETS", b) :: Nil) :: _ =>
+ val clusterCols: Seq[String] = b.head match {
+ case Token("TOK_TABCOLNAME", children) => children.map(_.text)
+ case _ => parseFailed("Invalid ALTER TABLE command", node)
+ }
+ // If sort columns are specified, num buckets should be the third arg.
+ // If sort columns are not specified, num buckets should be the second arg.
+ // TODO: actually use `sortDirections` once we actually store that in the metastore
+ val (sortCols: Seq[String], sortDirections: Seq[SortDirection], numBuckets: Int) = {
+ b.tail match {
+ case Token("TOK_TABCOLNAME", children) :: numBucketsNode :: Nil =>
+ val (cols, directions) = children.map {
+ case Token("TOK_TABSORTCOLNAMEASC", Token(col, Nil) :: Nil) => (col, Ascending)
+ case Token("TOK_TABSORTCOLNAMEDESC", Token(col, Nil) :: Nil) => (col, Descending)
+ }.unzip
+ (cols, directions, numBucketsNode.text.toInt)
+ case numBucketsNode :: Nil =>
+ (Nil, Nil, numBucketsNode.text.toInt)
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }
+ }
+ AlterTableStorageProperties(
+ tableIdent,
+ BucketSpec(numBuckets, clusterCols, sortCols))(node.source)
+
+ // ALTER TABLE table_name NOT CLUSTERED
+ case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_CLUSTERED", Nil) :: Nil) :: _ =>
+ AlterTableNotClustered(tableIdent)(node.source)
+
+ // ALTER TABLE table_name NOT SORTED
+ case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_SORTED", Nil) :: Nil) :: _ =>
+ AlterTableNotSorted(tableIdent)(node.source)
+
+ // ALTER TABLE table_name SKEWED BY (col1, col2)
+ // ON ((col1_value, col2_value) [, (col1_value, col2_value), ...])
+ // [STORED AS DIRECTORIES];
+ case Token("TOK_ALTERTABLE_SKEWED",
+ Token("TOK_TABLESKEWED",
+ Token("TOK_TABCOLNAME", colNames) :: colValues :: rest) :: Nil) :: _ =>
+ // Example format:
+ //
+ // +- TOK_ALTERTABLE_SKEWED
+ // :- TOK_TABLESKEWED
+ // : :- TOK_TABCOLNAME
+ // : : :- dt
+ // : : +- country
+ // :- TOK_TABCOLVALUE_PAIR
+ // : :- TOK_TABCOLVALUES
+ // : : :- TOK_TABCOLVALUE
+ // : : : :- '2008-08-08'
+ // : : : +- 'us'
+ // : :- TOK_TABCOLVALUES
+ // : : :- TOK_TABCOLVALUE
+ // : : : :- '2009-09-09'
+ // : : : +- 'uk'
+ // +- TOK_STOREASDIR
+ val names = colNames.map { n => cleanAndUnquoteString(n.text) }
+ val values = colValues match {
+ case Token("TOK_TABCOLVALUE", vals) =>
+ Seq(vals.map { n => cleanAndUnquoteString(n.text) })
+ case Token("TOK_TABCOLVALUE_PAIR", pairs) =>
+ pairs.map {
+ case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", vals) :: Nil) =>
+ vals.map { n => cleanAndUnquoteString(n.text) }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }
+ val storedAsDirs = rest match {
+ case Token("TOK_STOREDASDIRS", Nil) :: Nil => true
+ case _ => false
+ }
+ AlterTableSkewed(
+ tableIdent,
+ names,
+ values,
+ storedAsDirs)(node.source)
+
+ // ALTER TABLE table_name NOT SKEWED
+ case Token("TOK_ALTERTABLE_SKEWED", Nil) :: _ =>
+ AlterTableNotSkewed(tableIdent)(node.source)
+
+ // ALTER TABLE table_name NOT STORED AS DIRECTORIES
+ case Token("TOK_ALTERTABLE_SKEWED", Token("TOK_STOREDASDIRS", Nil) :: Nil) :: _ =>
+ AlterTableNotStoredAsDirs(tableIdent)(node.source)
+
+ // ALTER TABLE table_name SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] );
+ case Token("TOK_ALTERTABLE_SKEWED_LOCATION",
+ Token("TOK_SKEWED_LOCATIONS",
+ Token("TOK_SKEWED_LOCATION_LIST", locationMaps) :: Nil) :: Nil) :: _ =>
+ // Expected format:
+ //
+ // +- TOK_ALTERTABLE_SKEWED_LOCATION
+ // +- TOK_SKEWED_LOCATIONS
+ // +- TOK_SKEWED_LOCATION_LIST
+ // :- TOK_SKEWED_LOCATION_MAP
+ // : :- 'col1'
+ // : +- 'loc1'
+ // +- TOK_SKEWED_LOCATION_MAP
+ // :- TOK_TABCOLVALUES
+ // : +- TOK_TABCOLVALUE
+ // : :- 'col2'
+ // : +- 'col3'
+ // +- 'loc2'
+ val skewedMaps = locationMaps.flatMap {
+ case Token("TOK_SKEWED_LOCATION_MAP", col :: loc :: Nil) =>
+ col match {
+ case Token(const, Nil) =>
+ Seq((cleanAndUnquoteString(const), cleanAndUnquoteString(loc.text)))
+ case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", keys) :: Nil) =>
+ keys.map { k => (cleanAndUnquoteString(k.text), cleanAndUnquoteString(loc.text)) }
+ }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }.toMap
+ AlterTableSkewedLocation(tableIdent, skewedMaps)(node.source)
+
+ // ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
+ // spec [LOCATION 'loc2'] ...;
+ case Token("TOK_ALTERTABLE_ADDPARTS", args) :: _ =>
+ val (ifNotExists, parts) = args.head match {
+ case Token("TOK_IFNOTEXISTS", Nil) => (true, args.tail)
+ case _ => (false, args)
+ }
+ // List of (spec, location) to describe partitions to add
+ // Each partition spec may or may not be followed by a location
+ val parsedParts = new ArrayBuffer[(TablePartitionSpec, Option[String])]
+ parts.foreach {
+ case t @ Token("TOK_PARTSPEC", _) =>
+ parsedParts += ((parsePartitionSpec(t), None))
+ case Token("TOK_PARTITIONLOCATION", loc :: Nil) =>
+ // Update the location of the last partition we just added
+ if (parsedParts.nonEmpty) {
+ val (spec, _) = parsedParts.remove(parsedParts.length - 1)
+ parsedParts += ((spec, Some(unquoteString(loc.text))))
+ }
+ case _ =>
+ parseFailed("Invalid ALTER TABLE command", node)
+ }
+ AlterTableAddPartition(tableIdent, parsedParts, ifNotExists)(node.source)
+
+ // ALTER TABLE table_name PARTITION spec1 RENAME TO PARTITION spec2;
+ case Token("TOK_ALTERTABLE_RENAMEPART", spec :: Nil) :: _ =>
+ val newPartition = parsePartitionSpec(spec)
+ val oldPartition = partition.getOrElse {
+ parseFailed("Expected old partition spec in ALTER TABLE rename partition command", node)
+ }
+ AlterTableRenamePartition(tableIdent, oldPartition, newPartition)(node.source)
+
+ // ALTER TABLE table_name_1 EXCHANGE PARTITION spec WITH TABLE table_name_2;
+ case Token("TOK_ALTERTABLE_EXCHANGEPARTITION", spec :: newTable :: Nil) :: _ =>
+ val parsedSpec = parsePartitionSpec(spec)
+ val newTableIdent = extractTableIdent(newTable)
+ AlterTableExchangePartition(tableIdent, newTableIdent, parsedSpec)(node.source)
+
+ // ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
+ case Token("TOK_ALTERTABLE_DROPPARTS", args) :: _ =>
+ val parts = args.collect { case p @ Token("TOK_PARTSPEC", _) => parsePartitionSpec(p) }
+ val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined
+ val purge = getClauseOption("PURGE", args).isDefined
+ AlterTableDropPartition(tableIdent, parts, ifExists, purge)(node.source)
+
+ // ALTER TABLE table_name ARCHIVE PARTITION spec;
+ case Token("TOK_ALTERTABLE_ARCHIVE", spec :: Nil) :: _ =>
+ AlterTableArchivePartition(tableIdent, parsePartitionSpec(spec))(node.source)
+
+ // ALTER TABLE table_name UNARCHIVE PARTITION spec;
+ case Token("TOK_ALTERTABLE_UNARCHIVE", spec :: Nil) :: _ =>
+ AlterTableUnarchivePartition(tableIdent, parsePartitionSpec(spec))(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] SET FILEFORMAT file_format;
+ case Token("TOK_ALTERTABLE_FILEFORMAT", args) :: _ =>
+ val Seq(fileFormat, genericFormat) =
+ getClauses(Seq("TOK_TABLEFILEFORMAT", "TOK_FILEFORMAT_GENERIC"), args)
+ // Note: the AST doesn't contain information about which file format is being set here.
+ // E.g. we can't differentiate between INPUTFORMAT and OUTPUTFORMAT if either is set.
+ // Right now this just stores the values, but we should figure out how to get the keys.
+ val fFormat = fileFormat
+ .map { _.children.map { n => cleanAndUnquoteString(n.text) }}
+ .getOrElse(Seq())
+ val gFormat = genericFormat.map { f => cleanAndUnquoteString(f.children(0).text) }
+ AlterTableSetFileFormat(tableIdent, partition, fFormat, gFormat)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] SET LOCATION "loc";
+ case Token("TOK_ALTERTABLE_LOCATION", Token(loc, Nil) :: Nil) :: _ =>
+ AlterTableSetLocation(tableIdent, partition, cleanAndUnquoteString(loc))(node.source)
+
+ // ALTER TABLE table_name TOUCH [PARTITION spec];
+ case Token("TOK_ALTERTABLE_TOUCH", args) :: _ =>
+ // Note: the partition spec, if it exists, comes after TOUCH, so `partition` should
+ // always be None here. Instead, we need to parse it from the TOUCH node's children.
+ val part = getClauseOption("TOK_PARTSPEC", args).map(parsePartitionSpec)
+ AlterTableTouch(tableIdent, part)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] COMPACT 'compaction_type';
+ case Token("TOK_ALTERTABLE_COMPACT", Token(compactType, Nil) :: Nil) :: _ =>
+ AlterTableCompact(tableIdent, partition, cleanAndUnquoteString(compactType))(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] CONCATENATE;
+ case Token("TOK_ALTERTABLE_MERGEFILES", _) :: _ =>
+ AlterTableMerge(tableIdent, partition)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] CHANGE [COLUMN] col_old_name col_new_name
+ // column_type [COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT];
+ case Token("TOK_ALTERTABLE_RENAMECOL", oldName :: newName :: dataType :: args) :: _ =>
+ val afterColName: Option[String] =
+ getClauseOption("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", args).map { ap =>
+ ap.children match {
+ case Token(col, Nil) :: Nil => col
+ case _ => parseFailed("Invalid ALTER TABLE command", node)
+ }
+ }
+ val restrict = getClauseOption("TOK_RESTRICT", args).isDefined
+ val cascade = getClauseOption("TOK_CASCADE", args).isDefined
+ val comment = args.headOption.map {
+ case Token("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", _) => null
+ case Token("TOK_RESTRICT", _) => null
+ case Token("TOK_CASCADE", _) => null
+ case Token(commentStr, Nil) => cleanAndUnquoteString(commentStr)
+ case _ => parseFailed("Invalid ALTER TABLE command", node)
+ }
+ AlterTableChangeCol(
+ tableIdent,
+ partition,
+ oldName.text,
+ newName.text,
+ nodeToDataType(dataType),
+ comment,
+ afterColName,
+ restrict,
+ cascade)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] ADD COLUMNS (name type [COMMENT comment], ...)
+ // [CASCADE|RESTRICT]
+ case Token("TOK_ALTERTABLE_ADDCOLS", args) :: _ =>
+ val columnNodes = getClause("TOK_TABCOLLIST", args).children
+ val columns = StructType(columnNodes.map(nodeToStructField))
+ val restrict = getClauseOption("TOK_RESTRICT", args).isDefined
+ val cascade = getClauseOption("TOK_CASCADE", args).isDefined
+ AlterTableAddCol(tableIdent, partition, columns, restrict, cascade)(node.source)
+
+ // ALTER TABLE table_name [PARTITION spec] REPLACE COLUMNS (name type [COMMENT comment], ...)
+ // [CASCADE|RESTRICT]
+ case Token("TOK_ALTERTABLE_REPLACECOLS", args) :: _ =>
+ val columnNodes = getClause("TOK_TABCOLLIST", args).children
+ val columns = StructType(columnNodes.map(nodeToStructField))
+ val restrict = getClauseOption("TOK_RESTRICT", args).isDefined
+ val cascade = getClauseOption("TOK_CASCADE", args).isDefined
+ AlterTableReplaceCol(tableIdent, partition, columns, restrict, cascade)(node.source)
+
+ case _ =>
+ parseFailed("Unsupported ALTER TABLE command", node)
+ }
+ }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
new file mode 100644
index 0000000000..9df58d214a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.types._
+
+
+// Note: The definition of these commands are based on the ones described in
+// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
+
+/**
+ * A DDL command expected to be parsed and run in an underlying system instead of in Spark.
+ */
+abstract class NativeDDLCommand(val sql: String) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.runNativeSql(sql)
+ }
+
+ override val output: Seq[Attribute] = {
+ Seq(AttributeReference("result", StringType, nullable = false)())
+ }
+
+}
+
+case class CreateDatabase(
+ databaseName: String,
+ ifNotExists: Boolean,
+ path: Option[String],
+ comment: Option[String],
+ props: Map[String, String])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class CreateFunction(
+ functionName: String,
+ alias: String,
+ resourcesMap: Map[String, String],
+ isTemp: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableRename(
+ oldName: TableIdentifier,
+ newName: TableIdentifier)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSetProperties(
+ tableName: TableIdentifier,
+ properties: Map[String, String])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableUnsetProperties(
+ tableName: TableIdentifier,
+ properties: Map[String, String],
+ ifExists: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSerDeProperties(
+ tableName: TableIdentifier,
+ serdeClassName: Option[String],
+ serdeProperties: Option[Map[String, String]],
+ partition: Option[Map[String, String]])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableStorageProperties(
+ tableName: TableIdentifier,
+ buckets: BucketSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableNotClustered(
+ tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableNotSorted(
+ tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSkewed(
+ tableName: TableIdentifier,
+ // e.g. (dt, country)
+ skewedCols: Seq[String],
+ // e.g. ('2008-08-08', 'us), ('2009-09-09', 'uk')
+ skewedValues: Seq[Seq[String]],
+ storedAsDirs: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging {
+
+ require(skewedValues.forall(_.size == skewedCols.size),
+ "number of columns in skewed values do not match number of skewed columns provided")
+}
+
+case class AlterTableNotSkewed(
+ tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableNotStoredAsDirs(
+ tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSkewedLocation(
+ tableName: TableIdentifier,
+ skewedMap: Map[String, String])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableAddPartition(
+ tableName: TableIdentifier,
+ partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
+ ifNotExists: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableRenamePartition(
+ tableName: TableIdentifier,
+ oldPartition: TablePartitionSpec,
+ newPartition: TablePartitionSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableExchangePartition(
+ fromTableName: TableIdentifier,
+ toTableName: TableIdentifier,
+ spec: TablePartitionSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableDropPartition(
+ tableName: TableIdentifier,
+ specs: Seq[TablePartitionSpec],
+ ifExists: Boolean,
+ purge: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableArchivePartition(
+ tableName: TableIdentifier,
+ spec: TablePartitionSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableUnarchivePartition(
+ tableName: TableIdentifier,
+ spec: TablePartitionSpec)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSetFileFormat(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ fileFormat: Seq[String],
+ genericFormat: Option[String])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableSetLocation(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ location: String)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableTouch(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableCompact(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ compactType: String)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableMerge(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec])(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableChangeCol(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ oldColName: String,
+ newColName: String,
+ dataType: DataType,
+ comment: Option[String],
+ afterColName: Option[String],
+ restrict: Boolean,
+ cascade: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableAddCol(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ columns: StructType,
+ restrict: Boolean,
+ cascade: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
+
+case class AlterTableReplaceCol(
+ tableName: TableIdentifier,
+ partitionSpec: Option[TablePartitionSpec],
+ columns: StructType,
+ restrict: Boolean,
+ cascade: Boolean)(sql: String)
+ extends NativeDDLCommand(sql) with Logging
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
new file mode 100644
index 0000000000..0d632a8a13
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.execution.SparkQl
+import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.types._
+
+class DDLCommandSuite extends PlanTest {
+ private val parser = new SparkQl
+
+ test("create database") {
+ val sql =
+ """
+ |CREATE DATABASE IF NOT EXISTS database_name
+ |COMMENT 'database_comment' LOCATION '/home/user/db'
+ |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
+ """.stripMargin
+ val parsed = parser.parsePlan(sql)
+ val expected = CreateDatabase(
+ "database_name",
+ ifNotExists = true,
+ Some("/home/user/db"),
+ Some("database_comment"),
+ Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("create function") {
+ val sql1 =
+ """
+ |CREATE TEMPORARY FUNCTION helloworld as
+ |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar',
+ |FILE 'path/to/file'
+ """.stripMargin
+ val sql2 =
+ """
+ |CREATE FUNCTION hello.world as
+ |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
+ |FILE 'path/to/file'
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val expected1 = CreateFunction(
+ "helloworld",
+ "com.matthewrathbone.example.SimpleUDFExample",
+ Map("jar" -> "/path/to/jar", "file" -> "path/to/file"),
+ isTemp = true)(sql1)
+ val expected2 = CreateFunction(
+ "hello.world",
+ "com.matthewrathbone.example.SimpleUDFExample",
+ Map("archive" -> "/path/to/archive", "file" -> "path/to/file"),
+ isTemp = false)(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: rename table") {
+ val sql = "ALTER TABLE table_name RENAME TO new_table_name"
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableRename(
+ TableIdentifier("table_name", None),
+ TableIdentifier("new_table_name", None))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: alter table properties") {
+ val sql1 = "ALTER TABLE table_name SET TBLPROPERTIES ('test' = 'test', " +
+ "'comment' = 'new_comment')"
+ val sql2 = "ALTER TABLE table_name UNSET TBLPROPERTIES ('comment', 'test')"
+ val sql3 = "ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'test')"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSetProperties(
+ tableIdent, Map("test" -> "test", "comment" -> "new_comment"))(sql1)
+ val expected2 = AlterTableUnsetProperties(
+ tableIdent, Map("comment" -> null, "test" -> null), ifExists = false)(sql2)
+ val expected3 = AlterTableUnsetProperties(
+ tableIdent, Map("comment" -> null, "test" -> null), ifExists = true)(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ }
+
+ test("alter table: SerDe properties") {
+ val sql1 = "ALTER TABLE table_name SET SERDE 'org.apache.class'"
+ val sql2 =
+ """
+ |ALTER TABLE table_name SET SERDE 'org.apache.class'
+ |WITH SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',')
+ """.stripMargin
+ val sql3 =
+ """
+ |ALTER TABLE table_name SET SERDEPROPERTIES ('columns'='foo,bar',
+ |'field.delim' = ',')
+ """.stripMargin
+ val sql4 =
+ """
+ |ALTER TABLE table_name PARTITION (test, dt='2008-08-08',
+ |country='us') SET SERDE 'org.apache.class' WITH SERDEPROPERTIES ('columns'='foo,bar',
+ |'field.delim' = ',')
+ """.stripMargin
+ val sql5 =
+ """
+ |ALTER TABLE table_name PARTITION (test, dt='2008-08-08',
+ |country='us') SET SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',')
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val parsed4 = parser.parsePlan(sql4)
+ val parsed5 = parser.parsePlan(sql5)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSerDeProperties(
+ tableIdent, Some("org.apache.class"), None, None)(sql1)
+ val expected2 = AlterTableSerDeProperties(
+ tableIdent,
+ Some("org.apache.class"),
+ Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
+ None)(sql2)
+ val expected3 = AlterTableSerDeProperties(
+ tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)(sql3)
+ val expected4 = AlterTableSerDeProperties(
+ tableIdent,
+ Some("org.apache.class"),
+ Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
+ Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql4)
+ val expected5 = AlterTableSerDeProperties(
+ tableIdent,
+ None,
+ Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
+ Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql5)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ comparePlans(parsed4, expected4)
+ comparePlans(parsed5, expected5)
+ }
+
+ test("alter table: storage properties") {
+ val sql1 = "ALTER TABLE table_name CLUSTERED BY (dt, country) INTO 10 BUCKETS"
+ val sql2 = "ALTER TABLE table_name CLUSTERED BY (dt, country) SORTED BY " +
+ "(dt, country DESC) INTO 10 BUCKETS"
+ val sql3 = "ALTER TABLE table_name NOT CLUSTERED"
+ val sql4 = "ALTER TABLE table_name NOT SORTED"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val parsed4 = parser.parsePlan(sql4)
+ val tableIdent = TableIdentifier("table_name", None)
+ val cols = List("dt", "country")
+ // TODO: also test the sort directions once we keep track of that
+ val expected1 = AlterTableStorageProperties(
+ tableIdent, BucketSpec(10, cols, Nil))(sql1)
+ val expected2 = AlterTableStorageProperties(
+ tableIdent, BucketSpec(10, cols, cols))(sql2)
+ val expected3 = AlterTableNotClustered(tableIdent)(sql3)
+ val expected4 = AlterTableNotSorted(tableIdent)(sql4)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ comparePlans(parsed4, expected4)
+ }
+
+ test("alter table: skewed") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name SKEWED BY (dt, country) ON
+ |(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn')) STORED AS DIRECTORIES
+ """.stripMargin
+ val sql2 =
+ """
+ |ALTER TABLE table_name SKEWED BY (dt, country) ON
+ |('2008-08-08', 'us') STORED AS DIRECTORIES
+ """.stripMargin
+ val sql3 =
+ """
+ |ALTER TABLE table_name SKEWED BY (dt, country) ON
+ |(('2008-08-08', 'us'), ('2009-09-09', 'uk'))
+ """.stripMargin
+ val sql4 = "ALTER TABLE table_name NOT SKEWED"
+ val sql5 = "ALTER TABLE table_name NOT STORED AS DIRECTORIES"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val parsed4 = parser.parsePlan(sql4)
+ val parsed5 = parser.parsePlan(sql5)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSkewed(
+ tableIdent,
+ Seq("dt", "country"),
+ Seq(List("2008-08-08", "us"), List("2009-09-09", "uk"), List("2010-10-10", "cn")),
+ storedAsDirs = true)(sql1)
+ val expected2 = AlterTableSkewed(
+ tableIdent,
+ Seq("dt", "country"),
+ Seq(List("2008-08-08", "us")),
+ storedAsDirs = true)(sql2)
+ val expected3 = AlterTableSkewed(
+ tableIdent,
+ Seq("dt", "country"),
+ Seq(List("2008-08-08", "us"), List("2009-09-09", "uk")),
+ storedAsDirs = false)(sql3)
+ val expected4 = AlterTableNotSkewed(tableIdent)(sql4)
+ val expected5 = AlterTableNotStoredAsDirs(tableIdent)(sql5)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ comparePlans(parsed4, expected4)
+ comparePlans(parsed5, expected5)
+ }
+
+ test("alter table: skewed location") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name SET SKEWED LOCATION
+ |('123'='location1', 'test'='location2')
+ """.stripMargin
+ val sql2 =
+ """
+ |ALTER TABLE table_name SET SKEWED LOCATION
+ |(('2008-08-08', 'us')='location1', 'test'='location2')
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSkewedLocation(
+ tableIdent,
+ Map("123" -> "location1", "test" -> "location2"))(sql1)
+ val expected2 = AlterTableSkewedLocation(
+ tableIdent,
+ Map("2008-08-08" -> "location1", "us" -> "location1", "test" -> "location2"))(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: add partition") {
+ val sql =
+ """
+ |ALTER TABLE table_name ADD IF NOT EXISTS PARTITION
+ |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION
+ |(dt='2009-09-09', country='uk')
+ """.stripMargin
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableAddPartition(
+ TableIdentifier("table_name", None),
+ Seq(
+ (Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
+ (Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
+ ifNotExists = true)(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: rename partition") {
+ val sql =
+ """
+ |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
+ |RENAME TO PARTITION (dt='2008-09-09', country='uk')
+ """.stripMargin
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableRenamePartition(
+ TableIdentifier("table_name", None),
+ Map("dt" -> "2008-08-08", "country" -> "us"),
+ Map("dt" -> "2008-09-09", "country" -> "uk"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: exchange partition") {
+ val sql =
+ """
+ |ALTER TABLE table_name_1 EXCHANGE PARTITION
+ |(dt='2008-08-08', country='us') WITH TABLE table_name_2
+ """.stripMargin
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableExchangePartition(
+ TableIdentifier("table_name_1", None),
+ TableIdentifier("table_name_2", None),
+ Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: drop partitions") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name DROP IF EXISTS PARTITION
+ |(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
+ """.stripMargin
+ val sql2 =
+ """
+ |ALTER TABLE table_name DROP PARTITION
+ |(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk') PURGE
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableDropPartition(
+ tableIdent,
+ Seq(
+ Map("dt" -> "2008-08-08", "country" -> "us"),
+ Map("dt" -> "2009-09-09", "country" -> "uk")),
+ ifExists = true,
+ purge = false)(sql1)
+ val expected2 = AlterTableDropPartition(
+ tableIdent,
+ Seq(
+ Map("dt" -> "2008-08-08", "country" -> "us"),
+ Map("dt" -> "2009-09-09", "country" -> "uk")),
+ ifExists = false,
+ purge = true)(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: archive partition") {
+ val sql = "ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')"
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableArchivePartition(
+ TableIdentifier("table_name", None),
+ Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: unarchive partition") {
+ val sql = "ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')"
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableUnarchivePartition(
+ TableIdentifier("table_name", None),
+ Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
+ comparePlans(parsed, expected)
+ }
+
+ test("alter table: set file format") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test'
+ |OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test'
+ """.stripMargin
+ val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
+ "OUTPUTFORMAT 'test' SERDE 'test'"
+ val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
+ "SET FILEFORMAT PARQUET"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSetFileFormat(
+ tableIdent,
+ None,
+ List("test", "test", "test", "test", "test"),
+ None)(sql1)
+ val expected2 = AlterTableSetFileFormat(
+ tableIdent,
+ None,
+ List("test", "test", "test"),
+ None)(sql2)
+ val expected3 = AlterTableSetFileFormat(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")),
+ Seq(),
+ Some("PARQUET"))(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ }
+
+ test("alter table: set location") {
+ val sql1 = "ALTER TABLE table_name SET LOCATION 'new location'"
+ val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
+ "SET LOCATION 'new location'"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableSetLocation(
+ tableIdent,
+ None,
+ "new location")(sql1)
+ val expected2 = AlterTableSetLocation(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")),
+ "new location")(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: touch") {
+ val sql1 = "ALTER TABLE table_name TOUCH"
+ val sql2 = "ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableTouch(
+ tableIdent,
+ None)(sql1)
+ val expected2 = AlterTableTouch(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: compact") {
+ val sql1 = "ALTER TABLE table_name COMPACT 'compaction_type'"
+ val sql2 =
+ """
+ |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
+ |COMPACT 'MAJOR'
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableCompact(
+ tableIdent,
+ None,
+ "compaction_type")(sql1)
+ val expected2 = AlterTableCompact(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")),
+ "MAJOR")(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: concatenate") {
+ val sql1 = "ALTER TABLE table_name CONCATENATE"
+ val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE"
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableMerge(tableIdent, None)(sql1)
+ val expected2 = AlterTableMerge(
+ tableIdent, Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+ test("alter table: change column name/type/position/comment") {
+ val sql1 = "ALTER TABLE table_name CHANGE col_old_name col_new_name INT"
+ val sql2 =
+ """
+ |ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT
+ |COMMENT 'col_comment' FIRST CASCADE
+ """.stripMargin
+ val sql3 =
+ """
+ |ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT
+ |COMMENT 'col_comment' AFTER column_name RESTRICT
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val parsed3 = parser.parsePlan(sql3)
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableChangeCol(
+ tableName = tableIdent,
+ partitionSpec = None,
+ oldColName = "col_old_name",
+ newColName = "col_new_name",
+ dataType = IntegerType,
+ comment = None,
+ afterColName = None,
+ restrict = false,
+ cascade = false)(sql1)
+ val expected2 = AlterTableChangeCol(
+ tableName = tableIdent,
+ partitionSpec = None,
+ oldColName = "col_old_name",
+ newColName = "col_new_name",
+ dataType = IntegerType,
+ comment = Some("col_comment"),
+ afterColName = None,
+ restrict = false,
+ cascade = true)(sql2)
+ val expected3 = AlterTableChangeCol(
+ tableName = tableIdent,
+ partitionSpec = None,
+ oldColName = "col_old_name",
+ newColName = "col_new_name",
+ dataType = IntegerType,
+ comment = Some("col_comment"),
+ afterColName = Some("column_name"),
+ restrict = true,
+ cascade = false)(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ }
+
+ test("alter table: add/replace columns") {
+ val sql1 =
+ """
+ |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
+ |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
+ |COMMENT 'test_comment2') CASCADE
+ """.stripMargin
+ val sql2 =
+ """
+ |ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT
+ |COMMENT 'test_comment', new_col2 LONG COMMENT 'test_comment2') RESTRICT
+ """.stripMargin
+ val parsed1 = parser.parsePlan(sql1)
+ val parsed2 = parser.parsePlan(sql2)
+ val meta1 = new MetadataBuilder().putString("comment", "test_comment").build()
+ val meta2 = new MetadataBuilder().putString("comment", "test_comment2").build()
+ val tableIdent = TableIdentifier("table_name", None)
+ val expected1 = AlterTableAddCol(
+ tableIdent,
+ Some(Map("dt" -> "2008-08-08", "country" -> "us")),
+ StructType(Seq(
+ StructField("new_col1", IntegerType, nullable = true, meta1),
+ StructField("new_col2", LongType, nullable = true, meta2))),
+ restrict = false,
+ cascade = true)(sql1)
+ val expected2 = AlterTableReplaceCol(
+ tableIdent,
+ None,
+ StructType(Seq(
+ StructField("new_col1", IntegerType, nullable = true, meta1),
+ StructField("new_col2", LongType, nullable = true, meta2))),
+ restrict = true,
+ cascade = false)(sql2)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
+}