aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala113
1 files changed, 49 insertions, 64 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 60e6b5db62..94b46c5d97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
-import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -65,7 +64,11 @@ case class CreateTableLikeCommand(
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
}
- val sourceTableDesc = catalog.getTableMetadata(sourceTable)
+ val sourceTableDesc = if (sourceTable.database.isDefined) {
+ catalog.getTableMetadata(sourceTable)
+ } else {
+ catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
+ }
// Storage format
val newStorage =
@@ -158,14 +161,13 @@ case class AlterTableRenameCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- val table = catalog.getTableMetadata(oldName)
- DDLUtils.verifyAlterTableType(catalog, table, isView)
// If this is a temp view, just rename the view.
// Otherwise, if this is a real table, we also need to uncache and invalidate the table.
- val isTemporary = catalog.isTemporaryTable(oldName)
- if (isTemporary) {
+ if (catalog.isTemporaryTable(oldName)) {
catalog.renameTable(oldName, newName)
} else {
+ val table = catalog.getTableMetadata(oldName)
+ DDLUtils.verifyAlterTableType(catalog, table, isView)
val newTblName = TableIdentifier(newName, oldName.database)
// If an exception is thrown here we can just assume the table is uncached;
// this can happen with Hive tables when the underlying catalog is in-memory.
@@ -215,40 +217,38 @@ case class LoadDataCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- if (!catalog.tableExists(table)) {
- throw new AnalysisException(s"Target table in LOAD DATA does not exist: $table")
- }
- val targetTable = catalog.getTableMetadataOption(table).getOrElse {
- throw new AnalysisException(s"Target table in LOAD DATA cannot be temporary: $table")
- }
+ val targetTable = catalog.getTableMetadata(table)
+ val tableIdentwithDB = targetTable.identifier.quotedString
+
if (targetTable.tableType == CatalogTableType.VIEW) {
- throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $table")
+ throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $tableIdentwithDB")
}
if (DDLUtils.isDatasourceTable(targetTable)) {
- throw new AnalysisException(s"LOAD DATA is not supported for datasource tables: $table")
+ throw new AnalysisException(
+ s"LOAD DATA is not supported for datasource tables: $tableIdentwithDB")
}
if (targetTable.partitionColumnNames.nonEmpty) {
if (partition.isEmpty) {
- throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but no partition spec is provided")
}
if (targetTable.partitionColumnNames.size != partition.get.size) {
- throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but number of columns in provided partition spec (${partition.get.size}) " +
s"do not match number of partitioned columns in table " +
s"(s${targetTable.partitionColumnNames.size})")
}
partition.get.keys.foreach { colName =>
if (!targetTable.partitionColumnNames.contains(colName)) {
- throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but the specified partition spec refers to a column that is not partitioned: " +
s"'$colName'")
}
}
} else {
if (partition.nonEmpty) {
- throw new AnalysisException(s"LOAD DATA target table $table is not partitioned, " +
- s"but a partition spec was provided.")
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " +
+ s"partitioned, but a partition spec was provided.")
}
}
@@ -336,32 +336,27 @@ case class TruncateTableCommand(
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
- if (!catalog.tableExists(tableName)) {
- throw new AnalysisException(s"Table $tableName in TRUNCATE TABLE does not exist.")
- }
- if (catalog.isTemporaryTable(tableName)) {
- throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on temporary tables: $tableName")
- }
val table = catalog.getTableMetadata(tableName)
+ val tableIdentwithDB = table.identifier.quotedString
+
if (table.tableType == CatalogTableType.EXTERNAL) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on external tables: $tableName")
+ s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentwithDB")
}
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on views: $tableName")
+ s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB")
}
val isDatasourceTable = DDLUtils.isDatasourceTable(table)
if (isDatasourceTable && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables created using the data sources API: $tableName")
+ s"for tables created using the data sources API: $tableIdentwithDB")
}
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables that are not partitioned: $tableName")
+ s"for tables that are not partitioned: $tableIdentwithDB")
}
val locations =
if (isDatasourceTable) {
@@ -369,7 +364,7 @@ case class TruncateTableCommand(
} else if (table.partitionColumnNames.isEmpty) {
Seq(table.storage.locationUri)
} else {
- catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
+ catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri)
}
val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
@@ -382,7 +377,7 @@ case class TruncateTableCommand(
} catch {
case NonFatal(e) =>
throw new AnalysisException(
- s"Failed to truncate table $tableName when removing data of the path: $path " +
+ s"Failed to truncate table $tableIdentwithDB when removing data of the path: $path " +
s"because of ${e.toString}")
}
}
@@ -392,10 +387,10 @@ case class TruncateTableCommand(
spark.sessionState.refreshTable(tableName.unquotedString)
// Also try to drop the contents of the table from the columnar cache
try {
- spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
+ spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier))
} catch {
case NonFatal(e) =>
- log.warn(s"Exception when attempting to uncache table $tableName", e)
+ log.warn(s"Exception when attempting to uncache table $tableIdentwithDB", e)
}
Seq.empty[Row]
}
@@ -600,13 +595,19 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio
* SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
* }}}
*/
-case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
+case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("col_name", StringType, nullable = false)() :: Nil
}
override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
+ val catalog = sparkSession.sessionState.catalog
+ val table = if (tableName.database.isDefined) {
+ catalog.getTableMetadata(tableName)
+ } else {
+ catalog.getTempViewOrPermanentTableMetadata(tableName.table)
+ }
+ table.schema.map { c =>
Row(c.name)
}
}
@@ -628,7 +629,7 @@ case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
* }}}
*/
case class ShowPartitionsCommand(
- table: TableIdentifier,
+ tableName: TableIdentifier,
spec: Option[TablePartitionSpec]) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("partition", StringType, nullable = false)() :: Nil
@@ -642,13 +643,8 @@ case class ShowPartitionsCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
-
- if (catalog.isTemporaryTable(table)) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a temporary table: ${table.unquotedString}")
- }
-
- val tab = catalog.getTableMetadata(table)
+ val table = catalog.getTableMetadata(tableName)
+ val tableIdentWithDB = table.identifier.quotedString
/**
* Validate and throws an [[AnalysisException]] exception under the following conditions:
@@ -656,19 +652,18 @@ case class ShowPartitionsCommand(
* 2. If it is a datasource table.
* 3. If it is a view.
*/
- if (tab.tableType == VIEW) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a view: ${tab.qualifiedName}")
+ if (table.tableType == VIEW) {
+ throw new AnalysisException(s"SHOW PARTITIONS is not allowed on a view: $tableIdentWithDB")
}
- if (tab.partitionColumnNames.isEmpty) {
+ if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}")
+ s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
}
- if (DDLUtils.isDatasourceTable(tab)) {
+ if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a datasource table: ${tab.qualifiedName}")
+ s"SHOW PARTITIONS is not allowed on a datasource table: $tableIdentWithDB")
}
/**
@@ -677,7 +672,7 @@ case class ShowPartitionsCommand(
* thrown if the partitioning spec is invalid.
*/
if (spec.isDefined) {
- val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains)
+ val badColumns = spec.get.keySet.filterNot(table.partitionColumnNames.contains)
if (badColumns.nonEmpty) {
val badCols = badColumns.mkString("[", ", ", "]")
throw new AnalysisException(
@@ -685,8 +680,8 @@ case class ShowPartitionsCommand(
}
}
- val partNames = catalog.listPartitions(table, spec).map { p =>
- getPartName(p.spec, tab.partitionColumnNames)
+ val partNames = catalog.listPartitions(tableName, spec).map { p =>
+ getPartName(p.spec, table.partitionColumnNames)
}
partNames.map(Row(_))
@@ -700,16 +695,6 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
-
- if (catalog.isTemporaryTable(table)) {
- throw new AnalysisException(
- s"SHOW CREATE TABLE cannot be applied to temporary table")
- }
-
- if (!catalog.tableExists(table)) {
- throw new AnalysisException(s"Table $table doesn't exist")
- }
-
val tableMetadata = catalog.getTableMetadata(table)
// TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table.