diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala | 113 |
1 files changed, 49 insertions, 64 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 60e6b5db62..94b46c5d97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.PartitioningUtils -import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -65,7 +64,11 @@ case class CreateTableLikeCommand( s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'") } - val sourceTableDesc = catalog.getTableMetadata(sourceTable) + val sourceTableDesc = if (sourceTable.database.isDefined) { + catalog.getTableMetadata(sourceTable) + } else { + catalog.getTempViewOrPermanentTableMetadata(sourceTable.table) + } // Storage format val newStorage = @@ -158,14 +161,13 @@ case class AlterTableRenameCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(oldName) - DDLUtils.verifyAlterTableType(catalog, table, isView) // If this is a temp view, just rename the view. // Otherwise, if this is a real table, we also need to uncache and invalidate the table. - val isTemporary = catalog.isTemporaryTable(oldName) - if (isTemporary) { + if (catalog.isTemporaryTable(oldName)) { catalog.renameTable(oldName, newName) } else { + val table = catalog.getTableMetadata(oldName) + DDLUtils.verifyAlterTableType(catalog, table, isView) val newTblName = TableIdentifier(newName, oldName.database) // If an exception is thrown here we can just assume the table is uncached; // this can happen with Hive tables when the underlying catalog is in-memory. @@ -215,40 +217,38 @@ case class LoadDataCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (!catalog.tableExists(table)) { - throw new AnalysisException(s"Target table in LOAD DATA does not exist: $table") - } - val targetTable = catalog.getTableMetadataOption(table).getOrElse { - throw new AnalysisException(s"Target table in LOAD DATA cannot be temporary: $table") - } + val targetTable = catalog.getTableMetadata(table) + val tableIdentwithDB = targetTable.identifier.quotedString + if (targetTable.tableType == CatalogTableType.VIEW) { - throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $table") + throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $tableIdentwithDB") } if (DDLUtils.isDatasourceTable(targetTable)) { - throw new AnalysisException(s"LOAD DATA is not supported for datasource tables: $table") + throw new AnalysisException( + s"LOAD DATA is not supported for datasource tables: $tableIdentwithDB") } if (targetTable.partitionColumnNames.nonEmpty) { if (partition.isEmpty) { - throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + s"but no partition spec is provided") } if (targetTable.partitionColumnNames.size != partition.get.size) { - throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + s"but number of columns in provided partition spec (${partition.get.size}) " + s"do not match number of partitioned columns in table " + s"(s${targetTable.partitionColumnNames.size})") } partition.get.keys.foreach { colName => if (!targetTable.partitionColumnNames.contains(colName)) { - throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + s"but the specified partition spec refers to a column that is not partitioned: " + s"'$colName'") } } } else { if (partition.nonEmpty) { - throw new AnalysisException(s"LOAD DATA target table $table is not partitioned, " + - s"but a partition spec was provided.") + throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " + + s"partitioned, but a partition spec was provided.") } } @@ -336,32 +336,27 @@ case class TruncateTableCommand( override def run(spark: SparkSession): Seq[Row] = { val catalog = spark.sessionState.catalog - if (!catalog.tableExists(tableName)) { - throw new AnalysisException(s"Table $tableName in TRUNCATE TABLE does not exist.") - } - if (catalog.isTemporaryTable(tableName)) { - throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on temporary tables: $tableName") - } val table = catalog.getTableMetadata(tableName) + val tableIdentwithDB = table.identifier.quotedString + if (table.tableType == CatalogTableType.EXTERNAL) { throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on external tables: $tableName") + s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentwithDB") } if (table.tableType == CatalogTableType.VIEW) { throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on views: $tableName") + s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB") } val isDatasourceTable = DDLUtils.isDatasourceTable(table) if (isDatasourceTable && partitionSpec.isDefined) { throw new AnalysisException( s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + - s"for tables created using the data sources API: $tableName") + s"for tables created using the data sources API: $tableIdentwithDB") } if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { throw new AnalysisException( s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + - s"for tables that are not partitioned: $tableName") + s"for tables that are not partitioned: $tableIdentwithDB") } val locations = if (isDatasourceTable) { @@ -369,7 +364,7 @@ case class TruncateTableCommand( } else if (table.partitionColumnNames.isEmpty) { Seq(table.storage.locationUri) } else { - catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri) + catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri) } val hadoopConf = spark.sessionState.newHadoopConf() locations.foreach { location => @@ -382,7 +377,7 @@ case class TruncateTableCommand( } catch { case NonFatal(e) => throw new AnalysisException( - s"Failed to truncate table $tableName when removing data of the path: $path " + + s"Failed to truncate table $tableIdentwithDB when removing data of the path: $path " + s"because of ${e.toString}") } } @@ -392,10 +387,10 @@ case class TruncateTableCommand( spark.sessionState.refreshTable(tableName.unquotedString) // Also try to drop the contents of the table from the columnar cache try { - spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString)) + spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier)) } catch { case NonFatal(e) => - log.warn(s"Exception when attempting to uncache table $tableName", e) + log.warn(s"Exception when attempting to uncache table $tableIdentwithDB", e) } Seq.empty[Row] } @@ -600,13 +595,19 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; * }}} */ -case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand { +case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableCommand { override val output: Seq[Attribute] = { AttributeReference("col_name", StringType, nullable = false)() :: Nil } override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c => + val catalog = sparkSession.sessionState.catalog + val table = if (tableName.database.isDefined) { + catalog.getTableMetadata(tableName) + } else { + catalog.getTempViewOrPermanentTableMetadata(tableName.table) + } + table.schema.map { c => Row(c.name) } } @@ -628,7 +629,7 @@ case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand { * }}} */ case class ShowPartitionsCommand( - table: TableIdentifier, + tableName: TableIdentifier, spec: Option[TablePartitionSpec]) extends RunnableCommand { override val output: Seq[Attribute] = { AttributeReference("partition", StringType, nullable = false)() :: Nil @@ -642,13 +643,8 @@ case class ShowPartitionsCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - - if (catalog.isTemporaryTable(table)) { - throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a temporary table: ${table.unquotedString}") - } - - val tab = catalog.getTableMetadata(table) + val table = catalog.getTableMetadata(tableName) + val tableIdentWithDB = table.identifier.quotedString /** * Validate and throws an [[AnalysisException]] exception under the following conditions: @@ -656,19 +652,18 @@ case class ShowPartitionsCommand( * 2. If it is a datasource table. * 3. If it is a view. */ - if (tab.tableType == VIEW) { - throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a view: ${tab.qualifiedName}") + if (table.tableType == VIEW) { + throw new AnalysisException(s"SHOW PARTITIONS is not allowed on a view: $tableIdentWithDB") } - if (tab.partitionColumnNames.isEmpty) { + if (table.partitionColumnNames.isEmpty) { throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}") + s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB") } - if (DDLUtils.isDatasourceTable(tab)) { + if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a datasource table: ${tab.qualifiedName}") + s"SHOW PARTITIONS is not allowed on a datasource table: $tableIdentWithDB") } /** @@ -677,7 +672,7 @@ case class ShowPartitionsCommand( * thrown if the partitioning spec is invalid. */ if (spec.isDefined) { - val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains) + val badColumns = spec.get.keySet.filterNot(table.partitionColumnNames.contains) if (badColumns.nonEmpty) { val badCols = badColumns.mkString("[", ", ", "]") throw new AnalysisException( @@ -685,8 +680,8 @@ case class ShowPartitionsCommand( } } - val partNames = catalog.listPartitions(table, spec).map { p => - getPartName(p.spec, tab.partitionColumnNames) + val partNames = catalog.listPartitions(tableName, spec).map { p => + getPartName(p.spec, table.partitionColumnNames) } partNames.map(Row(_)) @@ -700,16 +695,6 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - - if (catalog.isTemporaryTable(table)) { - throw new AnalysisException( - s"SHOW CREATE TABLE cannot be applied to temporary table") - } - - if (!catalog.tableExists(table)) { - throw new AnalysisException(s"Table $table doesn't exist") - } - val tableMetadata = catalog.getTableMetadata(table) // TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table. |