diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala | 504 |
1 files changed, 408 insertions, 96 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 373b557683..fc37a142cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.execution.command +import scala.util.control.NonFatal + import org.apache.spark.internal.Logging -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ + // Note: The definition of these commands are based on the ones described in // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL @@ -44,131 +48,379 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand { } +/** + * A command for users to create a new database. + * + * It will issue an error message when the database with the same name already exists, + * unless 'ifNotExists' is true. + * The syntax of using this command in SQL is: + * {{{ + * CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name + * }}} + */ case class CreateDatabase( databaseName: String, ifNotExists: Boolean, path: Option[String], comment: Option[String], - props: Map[String, String])(sql: String) - extends NativeDDLCommand(sql) with Logging + props: Map[String, String]) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + catalog.createDatabase( + CatalogDatabase( + databaseName, + comment.getOrElse(""), + path.getOrElse(catalog.getDefaultDBPath(databaseName)), + props), + ifNotExists) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} + /** - * Drop Database: Removes a database from the system. + * A command for users to remove a database from the system. * * 'ifExists': * - true, if database_name does't exist, no action * - false (default), if database_name does't exist, a warning message will be issued - * 'restric': - * - true (default), the database cannot be dropped if it is not empty. The inclusive - * tables must be dropped at first. - * - false, it is in the Cascade mode. The dependent objects are automatically dropped - * before dropping database. + * 'cascade': + * - true, the dependent objects are automatically dropped before dropping database. + * - false (default), it is in the Restrict mode. The database cannot be dropped if + * it is not empty. The inclusive tables must be dropped at first. + * + * The syntax of using this command in SQL is: + * {{{ + * DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; + * }}} */ case class DropDatabase( databaseName: String, ifExists: Boolean, - restrict: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging + cascade: Boolean) + extends RunnableCommand { -case class CreateFunction( - functionName: String, - alias: String, - resources: Seq[(String, String)], - isTemp: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade) + Seq.empty[Row] + } -case class AlterTableRename( - oldName: TableIdentifier, - newName: TableIdentifier)(sql: String) - extends NativeDDLCommand(sql) with Logging + override val output: Seq[Attribute] = Seq.empty +} -case class AlterTableSetProperties( +/** + * A command for users to add new (key, value) pairs into DBPROPERTIES + * If the database does not exist, an error message will be issued to indicate the database + * does not exist. + * The syntax of using this command in SQL is: + * {{{ + * ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) + * }}} + */ +case class AlterDatabaseProperties( + databaseName: String, + props: Map[String, String]) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName) + catalog.alterDatabase(db.copy(properties = db.properties ++ props)) + + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} + +/** + * A command for users to show the name of the database, its comment (if one has been set), and its + * root location on the filesystem. When extended is true, it also shows the database's properties + * If the database does not exist, an error message will be issued to indicate the database + * does not exist. + * The syntax of using this command in SQL is + * {{{ + * DESCRIBE DATABASE [EXTENDED] db_name + * }}} + */ +case class DescribeDatabase( + databaseName: String, + extended: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val dbMetadata: CatalogDatabase = + sqlContext.sessionState.catalog.getDatabaseMetadata(databaseName) + val result = + Row("Database Name", dbMetadata.name) :: + Row("Description", dbMetadata.description) :: + Row("Location", dbMetadata.locationUri) :: Nil + + if (extended) { + val properties = + if (dbMetadata.properties.isEmpty) { + "" + } else { + dbMetadata.properties.toSeq.mkString("(", ", ", ")") + } + result :+ Row("Properties", properties) + } else { + result + } + } + + override val output: Seq[Attribute] = { + AttributeReference("database_description_item", StringType, nullable = false)() :: + AttributeReference("database_description_value", StringType, nullable = false)() :: Nil + } +} + +/** + * Drops a table/view from the metastore and removes it if it is cached. + * + * The syntax of this command is: + * {{{ + * DROP TABLE [IF EXISTS] table_name; + * DROP VIEW [IF EXISTS] [db_name.]view_name; + * }}} + */ +case class DropTable( tableName: TableIdentifier, - properties: Map[String, String])(sql: String) - extends NativeDDLCommand(sql) with Logging + ifExists: Boolean, + isView: Boolean) extends RunnableCommand { -case class AlterTableUnsetProperties( + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + if (!catalog.tableExists(tableName)) { + if (!ifExists) { + val objectName = if (isView) "View" else "Table" + logError(s"$objectName '${tableName.quotedString}' does not exist") + } + } else { + // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view + // issue an exception. + catalog.getTableMetadataOption(tableName).map(_.tableType match { + case CatalogTableType.VIRTUAL_VIEW if !isView => + throw new AnalysisException( + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") + case o if o != CatalogTableType.VIRTUAL_VIEW && isView => + throw new AnalysisException( + s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") + case _ => + }) + try { + sqlContext.cacheManager.tryUncacheQuery(sqlContext.table(tableName.quotedString)) + } catch { + case NonFatal(e) => log.warn(s"${e.getMessage}", e) + } + catalog.invalidateTable(tableName) + catalog.dropTable(tableName, ifExists) + } + Seq.empty[Row] + } +} + +/** + * A command that sets table/view properties. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...); + * ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...); + * }}} + */ +case class AlterTableSetProperties( tableName: TableIdentifier, properties: Map[String, String], - ifExists: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging + isView: Boolean) + extends RunnableCommand { -case class AlterTableSerDeProperties( - tableName: TableIdentifier, - serdeClassName: Option[String], - serdeProperties: Option[Map[String, String]], - partition: Option[Map[String, String]])(sql: String) - extends NativeDDLCommand(sql) with Logging + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + DDLUtils.verifyAlterTableType(catalog, tableName, isView) + val table = catalog.getTableMetadata(tableName) + val newProperties = table.properties ++ properties + if (DDLUtils.isDatasourceTable(newProperties)) { + throw new AnalysisException( + "alter table properties is not supported for tables defined using the datasource API") + } + val newTable = table.copy(properties = newProperties) + catalog.alterTable(newTable) + Seq.empty[Row] + } -case class AlterTableStorageProperties( +} + +/** + * A command that unsets table/view properties. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...); + * ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...); + * }}} + */ +case class AlterTableUnsetProperties( tableName: TableIdentifier, - buckets: BucketSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging + propKeys: Seq[String], + ifExists: Boolean, + isView: Boolean) + extends RunnableCommand { -case class AlterTableNotClustered( - tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + DDLUtils.verifyAlterTableType(catalog, tableName, isView) + val table = catalog.getTableMetadata(tableName) + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + "alter table properties is not supported for datasource tables") + } + if (!ifExists) { + propKeys.foreach { k => + if (!table.properties.contains(k)) { + throw new AnalysisException( + s"attempted to unset non-existent property '$k' in table '$tableName'") + } + } + } + val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } + val newTable = table.copy(properties = newProperties) + catalog.alterTable(newTable) + Seq.empty[Row] + } -case class AlterTableNotSorted( - tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging +} -case class AlterTableSkewed( +/** + * A command that sets the serde class and/or serde properties of a table/view. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props]; + * ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties; + * }}} + */ +case class AlterTableSerDeProperties( tableName: TableIdentifier, - // e.g. (dt, country) - skewedCols: Seq[String], - // e.g. ('2008-08-08', 'us), ('2009-09-09', 'uk') - skewedValues: Seq[Seq[String]], - storedAsDirs: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging { - - require(skewedValues.forall(_.size == skewedCols.size), - "number of columns in skewed values do not match number of skewed columns provided") -} + serdeClassName: Option[String], + serdeProperties: Option[Map[String, String]], + partition: Option[Map[String, String]]) + extends RunnableCommand { -case class AlterTableNotSkewed( - tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging + // should never happen if we parsed things correctly + require(serdeClassName.isDefined || serdeProperties.isDefined, + "alter table attempted to set neither serde class name nor serde properties") -case class AlterTableNotStoredAsDirs( - tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + // Do not support setting serde for datasource tables + if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + "alter table serde is not supported for datasource tables") + } + val newTable = table.withNewStorage( + serde = serdeClassName.orElse(table.storage.serde), + serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map())) + catalog.alterTable(newTable) + Seq.empty[Row] + } -case class AlterTableSkewedLocation( - tableName: TableIdentifier, - skewedMap: Map[String, String])(sql: String) - extends NativeDDLCommand(sql) with Logging +} +/** + * Add Partition in ALTER TABLE: add the table partitions. + * + * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE, + * EXCEPT that it is ILLEGAL to specify a LOCATION clause. + * An error message will be issued if the partition exists, unless 'ifNotExists' is true. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] + * }}} + */ case class AlterTableAddPartition( tableName: TableIdentifier, partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])], - ifNotExists: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging + ifNotExists: Boolean) + extends RunnableCommand { + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + "alter table add partition is not allowed for tables defined using the datasource API") + } + val parts = partitionSpecsAndLocs.map { case (spec, location) => + // inherit table storage format (possibly except for location) + CatalogTablePartition(spec, table.storage.copy(locationUri = location)) + } + catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists) + Seq.empty[Row] + } + +} + +/** + * Alter a table partition's spec. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2; + * }}} + */ case class AlterTableRenamePartition( tableName: TableIdentifier, oldPartition: TablePartitionSpec, - newPartition: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging + newPartition: TablePartitionSpec) + extends RunnableCommand { -case class AlterTableExchangePartition( - fromTableName: TableIdentifier, - toTableName: TableIdentifier, - spec: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sessionState.catalog.renamePartitions( + tableName, Seq(oldPartition), Seq(newPartition)) + Seq.empty[Row] + } +} + +/** + * Drop Partition in ALTER TABLE: to drop a particular partition for a table. + * + * This removes the data and metadata for this partition. + * The data is actually moved to the .Trash/Current directory if Trash is configured, + * unless 'purge' is true, but the metadata is completely lost. + * An error message will be issued if the partition does not exist, unless 'ifExists' is true. + * Note: purge is always false when the target is a view. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * }}} + */ case class AlterTableDropPartition( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], - ifExists: Boolean, - purge: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging + ifExists: Boolean) + extends RunnableCommand { -case class AlterTableArchivePartition( - tableName: TableIdentifier, - spec: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + "alter table drop partition is not allowed for tables defined using the datasource API") + } + catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists) + Seq.empty[Row] + } -case class AlterTableUnarchivePartition( - tableName: TableIdentifier, - spec: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging +} case class AlterTableSetFileFormat( tableName: TableIdentifier, @@ -177,27 +429,55 @@ case class AlterTableSetFileFormat( genericFormat: Option[String])(sql: String) extends NativeDDLCommand(sql) with Logging +/** + * A command that sets the location of a table or a partition. + * + * For normal tables, this just sets the location URI in the table/partition's storage format. + * For datasource tables, this sets a "path" parameter in the table/partition's serde properties. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc"; + * }}} + */ case class AlterTableSetLocation( tableName: TableIdentifier, partitionSpec: Option[TablePartitionSpec], - location: String)(sql: String) - extends NativeDDLCommand(sql) with Logging - -case class AlterTableTouch( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec])(sql: String) - extends NativeDDLCommand(sql) with Logging + location: String) + extends RunnableCommand { -case class AlterTableCompact( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec], - compactType: String)(sql: String) - extends NativeDDLCommand(sql) with Logging + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + partitionSpec match { + case Some(spec) => + // Partition spec is specified, so we set the location only for this partition + val part = catalog.getPartition(tableName, spec) + val newPart = + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + "alter table set location for partition is not allowed for tables defined " + + "using the datasource API") + } else { + part.copy(storage = part.storage.copy(locationUri = Some(location))) + } + catalog.alterPartitions(tableName, Seq(newPart)) + case None => + // No partition spec is specified, so we set the location for the table itself + val newTable = + if (DDLUtils.isDatasourceTable(table)) { + table.withNewStorage( + locationUri = Some(location), + serdeProperties = table.storage.serdeProperties ++ Map("path" -> location)) + } else { + table.withNewStorage(locationUri = Some(location)) + } + catalog.alterTable(newTable) + } + Seq.empty[Row] + } -case class AlterTableMerge( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec])(sql: String) - extends NativeDDLCommand(sql) with Logging +} case class AlterTableChangeCol( tableName: TableIdentifier, @@ -226,3 +506,35 @@ case class AlterTableReplaceCol( restrict: Boolean, cascade: Boolean)(sql: String) extends NativeDDLCommand(sql) with Logging + + +private object DDLUtils { + + def isDatasourceTable(props: Map[String, String]): Boolean = { + props.contains("spark.sql.sources.provider") + } + + def isDatasourceTable(table: CatalogTable): Boolean = { + isDatasourceTable(table.properties) + } + + /** + * If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view, + * issue an exception [[AnalysisException]]. + */ + def verifyAlterTableType( + catalog: SessionCatalog, + tableIdentifier: TableIdentifier, + isView: Boolean): Unit = { + catalog.getTableMetadataOption(tableIdentifier).map(_.tableType match { + case CatalogTableType.VIRTUAL_VIEW if !isView => + throw new AnalysisException( + "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead") + case o if o != CatalogTableType.VIRTUAL_VIEW && isView => + throw new AnalysisException( + s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead") + case _ => + }) + } +} + |