aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala504
1 files changed, 408 insertions, 96 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 373b557683..fc37a142cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -17,15 +17,19 @@
package org.apache.spark.sql.execution.command
+import scala.util.control.NonFatal
+
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
+
// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -44,131 +48,379 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand {
}
+/**
+ * A command for users to create a new database.
+ *
+ * It will issue an error message when the database with the same name already exists,
+ * unless 'ifNotExists' is true.
+ * The syntax of using this command in SQL is:
+ * {{{
+ * CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name
+ * }}}
+ */
case class CreateDatabase(
databaseName: String,
ifNotExists: Boolean,
path: Option[String],
comment: Option[String],
- props: Map[String, String])(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ props: Map[String, String])
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ catalog.createDatabase(
+ CatalogDatabase(
+ databaseName,
+ comment.getOrElse(""),
+ path.getOrElse(catalog.getDefaultDBPath(databaseName)),
+ props),
+ ifNotExists)
+ Seq.empty[Row]
+ }
+
+ override val output: Seq[Attribute] = Seq.empty
+}
+
/**
- * Drop Database: Removes a database from the system.
+ * A command for users to remove a database from the system.
*
* 'ifExists':
* - true, if database_name does't exist, no action
* - false (default), if database_name does't exist, a warning message will be issued
- * 'restric':
- * - true (default), the database cannot be dropped if it is not empty. The inclusive
- * tables must be dropped at first.
- * - false, it is in the Cascade mode. The dependent objects are automatically dropped
- * before dropping database.
+ * 'cascade':
+ * - true, the dependent objects are automatically dropped before dropping database.
+ * - false (default), it is in the Restrict mode. The database cannot be dropped if
+ * it is not empty. The inclusive tables must be dropped at first.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
+ * }}}
*/
case class DropDatabase(
databaseName: String,
ifExists: Boolean,
- restrict: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ cascade: Boolean)
+ extends RunnableCommand {
-case class CreateFunction(
- functionName: String,
- alias: String,
- resources: Seq[(String, String)],
- isTemp: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
+ Seq.empty[Row]
+ }
-case class AlterTableRename(
- oldName: TableIdentifier,
- newName: TableIdentifier)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ override val output: Seq[Attribute] = Seq.empty
+}
-case class AlterTableSetProperties(
+/**
+ * A command for users to add new (key, value) pairs into DBPROPERTIES
+ * If the database does not exist, an error message will be issued to indicate the database
+ * does not exist.
+ * The syntax of using this command in SQL is:
+ * {{{
+ * ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
+ * }}}
+ */
+case class AlterDatabaseProperties(
+ databaseName: String,
+ props: Map[String, String])
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName)
+ catalog.alterDatabase(db.copy(properties = db.properties ++ props))
+
+ Seq.empty[Row]
+ }
+
+ override val output: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * A command for users to show the name of the database, its comment (if one has been set), and its
+ * root location on the filesystem. When extended is true, it also shows the database's properties
+ * If the database does not exist, an error message will be issued to indicate the database
+ * does not exist.
+ * The syntax of using this command in SQL is
+ * {{{
+ * DESCRIBE DATABASE [EXTENDED] db_name
+ * }}}
+ */
+case class DescribeDatabase(
+ databaseName: String,
+ extended: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val dbMetadata: CatalogDatabase =
+ sqlContext.sessionState.catalog.getDatabaseMetadata(databaseName)
+ val result =
+ Row("Database Name", dbMetadata.name) ::
+ Row("Description", dbMetadata.description) ::
+ Row("Location", dbMetadata.locationUri) :: Nil
+
+ if (extended) {
+ val properties =
+ if (dbMetadata.properties.isEmpty) {
+ ""
+ } else {
+ dbMetadata.properties.toSeq.mkString("(", ", ", ")")
+ }
+ result :+ Row("Properties", properties)
+ } else {
+ result
+ }
+ }
+
+ override val output: Seq[Attribute] = {
+ AttributeReference("database_description_item", StringType, nullable = false)() ::
+ AttributeReference("database_description_value", StringType, nullable = false)() :: Nil
+ }
+}
+
+/**
+ * Drops a table/view from the metastore and removes it if it is cached.
+ *
+ * The syntax of this command is:
+ * {{{
+ * DROP TABLE [IF EXISTS] table_name;
+ * DROP VIEW [IF EXISTS] [db_name.]view_name;
+ * }}}
+ */
+case class DropTable(
tableName: TableIdentifier,
- properties: Map[String, String])(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ ifExists: Boolean,
+ isView: Boolean) extends RunnableCommand {
-case class AlterTableUnsetProperties(
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ if (!catalog.tableExists(tableName)) {
+ if (!ifExists) {
+ val objectName = if (isView) "View" else "Table"
+ logError(s"$objectName '${tableName.quotedString}' does not exist")
+ }
+ } else {
+ // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
+ // issue an exception.
+ catalog.getTableMetadataOption(tableName).map(_.tableType match {
+ case CatalogTableType.VIRTUAL_VIEW if !isView =>
+ throw new AnalysisException(
+ "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+ case o if o != CatalogTableType.VIRTUAL_VIEW && isView =>
+ throw new AnalysisException(
+ s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+ case _ =>
+ })
+ try {
+ sqlContext.cacheManager.tryUncacheQuery(sqlContext.table(tableName.quotedString))
+ } catch {
+ case NonFatal(e) => log.warn(s"${e.getMessage}", e)
+ }
+ catalog.invalidateTable(tableName)
+ catalog.dropTable(tableName, ifExists)
+ }
+ Seq.empty[Row]
+ }
+}
+
+/**
+ * A command that sets table/view properties.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
+ * ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
+ * }}}
+ */
+case class AlterTableSetProperties(
tableName: TableIdentifier,
properties: Map[String, String],
- ifExists: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ isView: Boolean)
+ extends RunnableCommand {
-case class AlterTableSerDeProperties(
- tableName: TableIdentifier,
- serdeClassName: Option[String],
- serdeProperties: Option[Map[String, String]],
- partition: Option[Map[String, String]])(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ DDLUtils.verifyAlterTableType(catalog, tableName, isView)
+ val table = catalog.getTableMetadata(tableName)
+ val newProperties = table.properties ++ properties
+ if (DDLUtils.isDatasourceTable(newProperties)) {
+ throw new AnalysisException(
+ "alter table properties is not supported for tables defined using the datasource API")
+ }
+ val newTable = table.copy(properties = newProperties)
+ catalog.alterTable(newTable)
+ Seq.empty[Row]
+ }
-case class AlterTableStorageProperties(
+}
+
+/**
+ * A command that unsets table/view properties.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
+ * ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
+ * }}}
+ */
+case class AlterTableUnsetProperties(
tableName: TableIdentifier,
- buckets: BucketSpec)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ propKeys: Seq[String],
+ ifExists: Boolean,
+ isView: Boolean)
+ extends RunnableCommand {
-case class AlterTableNotClustered(
- tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ DDLUtils.verifyAlterTableType(catalog, tableName, isView)
+ val table = catalog.getTableMetadata(tableName)
+ if (DDLUtils.isDatasourceTable(table)) {
+ throw new AnalysisException(
+ "alter table properties is not supported for datasource tables")
+ }
+ if (!ifExists) {
+ propKeys.foreach { k =>
+ if (!table.properties.contains(k)) {
+ throw new AnalysisException(
+ s"attempted to unset non-existent property '$k' in table '$tableName'")
+ }
+ }
+ }
+ val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
+ val newTable = table.copy(properties = newProperties)
+ catalog.alterTable(newTable)
+ Seq.empty[Row]
+ }
-case class AlterTableNotSorted(
- tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+}
-case class AlterTableSkewed(
+/**
+ * A command that sets the serde class and/or serde properties of a table/view.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
+ * ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
+ * }}}
+ */
+case class AlterTableSerDeProperties(
tableName: TableIdentifier,
- // e.g. (dt, country)
- skewedCols: Seq[String],
- // e.g. ('2008-08-08', 'us), ('2009-09-09', 'uk')
- skewedValues: Seq[Seq[String]],
- storedAsDirs: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging {
-
- require(skewedValues.forall(_.size == skewedCols.size),
- "number of columns in skewed values do not match number of skewed columns provided")
-}
+ serdeClassName: Option[String],
+ serdeProperties: Option[Map[String, String]],
+ partition: Option[Map[String, String]])
+ extends RunnableCommand {
-case class AlterTableNotSkewed(
- tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+ // should never happen if we parsed things correctly
+ require(serdeClassName.isDefined || serdeProperties.isDefined,
+ "alter table attempted to set neither serde class name nor serde properties")
-case class AlterTableNotStoredAsDirs(
- tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ val table = catalog.getTableMetadata(tableName)
+ // Do not support setting serde for datasource tables
+ if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
+ throw new AnalysisException(
+ "alter table serde is not supported for datasource tables")
+ }
+ val newTable = table.withNewStorage(
+ serde = serdeClassName.orElse(table.storage.serde),
+ serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))
+ catalog.alterTable(newTable)
+ Seq.empty[Row]
+ }
-case class AlterTableSkewedLocation(
- tableName: TableIdentifier,
- skewedMap: Map[String, String])(sql: String)
- extends NativeDDLCommand(sql) with Logging
+}
+/**
+ * Add Partition in ALTER TABLE: add the table partitions.
+ *
+ * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
+ * EXCEPT that it is ILLEGAL to specify a LOCATION clause.
+ * An error message will be issued if the partition exists, unless 'ifNotExists' is true.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
+ * }}}
+ */
case class AlterTableAddPartition(
tableName: TableIdentifier,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
- ifNotExists: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ ifNotExists: Boolean)
+ extends RunnableCommand {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ val table = catalog.getTableMetadata(tableName)
+ if (DDLUtils.isDatasourceTable(table)) {
+ throw new AnalysisException(
+ "alter table add partition is not allowed for tables defined using the datasource API")
+ }
+ val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+ // inherit table storage format (possibly except for location)
+ CatalogTablePartition(spec, table.storage.copy(locationUri = location))
+ }
+ catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists)
+ Seq.empty[Row]
+ }
+
+}
+
+/**
+ * Alter a table partition's spec.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2;
+ * }}}
+ */
case class AlterTableRenamePartition(
tableName: TableIdentifier,
oldPartition: TablePartitionSpec,
- newPartition: TablePartitionSpec)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ newPartition: TablePartitionSpec)
+ extends RunnableCommand {
-case class AlterTableExchangePartition(
- fromTableName: TableIdentifier,
- toTableName: TableIdentifier,
- spec: TablePartitionSpec)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.sessionState.catalog.renamePartitions(
+ tableName, Seq(oldPartition), Seq(newPartition))
+ Seq.empty[Row]
+ }
+}
+
+/**
+ * Drop Partition in ALTER TABLE: to drop a particular partition for a table.
+ *
+ * This removes the data and metadata for this partition.
+ * The data is actually moved to the .Trash/Current directory if Trash is configured,
+ * unless 'purge' is true, but the metadata is completely lost.
+ * An error message will be issued if the partition does not exist, unless 'ifExists' is true.
+ * Note: purge is always false when the target is a view.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
+ * }}}
+ */
case class AlterTableDropPartition(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
- ifExists: Boolean,
- purge: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ ifExists: Boolean)
+ extends RunnableCommand {
-case class AlterTableArchivePartition(
- tableName: TableIdentifier,
- spec: TablePartitionSpec)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ val table = catalog.getTableMetadata(tableName)
+ if (DDLUtils.isDatasourceTable(table)) {
+ throw new AnalysisException(
+ "alter table drop partition is not allowed for tables defined using the datasource API")
+ }
+ catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists)
+ Seq.empty[Row]
+ }
-case class AlterTableUnarchivePartition(
- tableName: TableIdentifier,
- spec: TablePartitionSpec)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+}
case class AlterTableSetFileFormat(
tableName: TableIdentifier,
@@ -177,27 +429,55 @@ case class AlterTableSetFileFormat(
genericFormat: Option[String])(sql: String)
extends NativeDDLCommand(sql) with Logging
+/**
+ * A command that sets the location of a table or a partition.
+ *
+ * For normal tables, this just sets the location URI in the table/partition's storage format.
+ * For datasource tables, this sets a "path" parameter in the table/partition's serde properties.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc";
+ * }}}
+ */
case class AlterTableSetLocation(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
- location: String)(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableTouch(
- tableName: TableIdentifier,
- partitionSpec: Option[TablePartitionSpec])(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ location: String)
+ extends RunnableCommand {
-case class AlterTableCompact(
- tableName: TableIdentifier,
- partitionSpec: Option[TablePartitionSpec],
- compactType: String)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ val table = catalog.getTableMetadata(tableName)
+ partitionSpec match {
+ case Some(spec) =>
+ // Partition spec is specified, so we set the location only for this partition
+ val part = catalog.getPartition(tableName, spec)
+ val newPart =
+ if (DDLUtils.isDatasourceTable(table)) {
+ throw new AnalysisException(
+ "alter table set location for partition is not allowed for tables defined " +
+ "using the datasource API")
+ } else {
+ part.copy(storage = part.storage.copy(locationUri = Some(location)))
+ }
+ catalog.alterPartitions(tableName, Seq(newPart))
+ case None =>
+ // No partition spec is specified, so we set the location for the table itself
+ val newTable =
+ if (DDLUtils.isDatasourceTable(table)) {
+ table.withNewStorage(
+ locationUri = Some(location),
+ serdeProperties = table.storage.serdeProperties ++ Map("path" -> location))
+ } else {
+ table.withNewStorage(locationUri = Some(location))
+ }
+ catalog.alterTable(newTable)
+ }
+ Seq.empty[Row]
+ }
-case class AlterTableMerge(
- tableName: TableIdentifier,
- partitionSpec: Option[TablePartitionSpec])(sql: String)
- extends NativeDDLCommand(sql) with Logging
+}
case class AlterTableChangeCol(
tableName: TableIdentifier,
@@ -226,3 +506,35 @@ case class AlterTableReplaceCol(
restrict: Boolean,
cascade: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
+
+
+private object DDLUtils {
+
+ def isDatasourceTable(props: Map[String, String]): Boolean = {
+ props.contains("spark.sql.sources.provider")
+ }
+
+ def isDatasourceTable(table: CatalogTable): Boolean = {
+ isDatasourceTable(table.properties)
+ }
+
+ /**
+ * If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
+ * issue an exception [[AnalysisException]].
+ */
+ def verifyAlterTableType(
+ catalog: SessionCatalog,
+ tableIdentifier: TableIdentifier,
+ isView: Boolean): Unit = {
+ catalog.getTableMetadataOption(tableIdentifier).map(_.tableType match {
+ case CatalogTableType.VIRTUAL_VIEW if !isView =>
+ throw new AnalysisException(
+ "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
+ case o if o != CatalogTableType.VIRTUAL_VIEW && isView =>
+ throw new AnalysisException(
+ s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead")
+ case _ =>
+ })
+ }
+}
+