aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-09-20 20:11:48 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-20 20:11:48 +0800
commitd5ec5dbb0dc0358b0394626c80781e422f9af581 (patch)
tree73d9bb2105d441cffae360fd8450a5ae180d15ec /sql/core/src
parent4a426ff8aea4faa31a3016a453dec5b7954578dd (diff)
downloadspark-d5ec5dbb0dc0358b0394626c80781e422f9af581.tar.gz
spark-d5ec5dbb0dc0358b0394626c80781e422f9af581.tar.bz2
spark-d5ec5dbb0dc0358b0394626c80781e422f9af581.zip
[SPARK-17502][SQL] Fix Multiple Bugs in DDL Statements on Temporary Views
### What changes were proposed in this pull request? - When the permanent tables/views do not exist but the temporary view exists, the expected error should be `NoSuchTableException` for partition-related ALTER TABLE commands. However, it always reports a confusing error message. For example, ``` Partition spec is invalid. The spec (a, b) must match the partition spec () defined in table '`testview`'; ``` - When the permanent tables/views do not exist but the temporary view exists, the expected error should be `NoSuchTableException` for `ALTER TABLE ... UNSET TBLPROPERTIES`. However, it reports a missing table property. For example, ``` Attempted to unset non-existent property 'p' in table '`testView`'; ``` - When `ANALYZE TABLE` is called on a view or a temporary view, we should issue an error message. However, it reports a strange error: ``` ANALYZE TABLE is not supported for Project ``` - When inserting into a temporary view that is generated from `Range`, we will get the following error message: ``` assertion failed: No plan for 'InsertIntoTable Range (0, 10, step=1, splits=Some(1)), false, false +- Project [1 AS 1#20] +- OneRowRelation$ ``` This PR is to fix the above four issues. ### How was this patch tested? Added multiple test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #15054 from gatorsmile/tempViewDDL.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala113
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala4
5 files changed, 74 insertions, 85 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 15687ddd72..40aecafecf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -22,6 +22,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
@@ -37,7 +38,9 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend
override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+ val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val tableIdentwithDB = TableIdentifier(tableIdent.table, Some(db))
+ val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentwithDB))
relation match {
case relation: CatalogRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index c0ccdca98e..b57b2d280d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -264,7 +264,7 @@ case class AlterTableUnsetPropertiesCommand(
propKeys.foreach { k =>
if (!table.properties.contains(k)) {
throw new AnalysisException(
- s"Attempted to unset non-existent property '$k' in table '$tableName'")
+ s"Attempted to unset non-existent property '$k' in table '${table.identifier}'")
}
}
}
@@ -317,11 +317,11 @@ case class AlterTableSerDePropertiesCommand(
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
- val part = catalog.getPartition(tableName, spec)
+ val part = catalog.getPartition(table.identifier, spec)
val newPart = part.copy(storage = part.storage.copy(
serde = serdeClassName.orElse(part.storage.serde),
properties = part.storage.properties ++ serdeProperties.getOrElse(Map())))
- catalog.alterPartitions(tableName, Seq(newPart))
+ catalog.alterPartitions(table.identifier, Seq(newPart))
}
Seq.empty[Row]
}
@@ -358,7 +358,7 @@ case class AlterTableAddPartitionCommand(
// inherit table storage format (possibly except for location)
CatalogTablePartition(spec, table.storage.copy(locationUri = location))
}
- catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists)
+ catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
Seq.empty[Row]
}
@@ -422,7 +422,7 @@ case class AlterTableDropPartitionCommand(
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
}
- catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge)
+ catalog.dropPartitions(table.identifier, specs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}
@@ -471,26 +471,20 @@ case class AlterTableRecoverPartitionsCommand(
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
- if (!catalog.tableExists(tableName)) {
- throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
- }
- if (catalog.isTemporaryTable(tableName)) {
- throw new AnalysisException(
- s"Operation not allowed: $cmd on temporary tables: $tableName")
- }
val table = catalog.getTableMetadata(tableName)
+ val tableIdentWithDB = table.identifier.quotedString
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
- s"Operation not allowed: $cmd on datasource tables: $tableName")
+ s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB")
}
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
- s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
+ s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
}
if (table.storage.locationUri.isEmpty) {
- throw new AnalysisException(
- s"Operation not allowed: $cmd only works on table with location provided: $tableName")
+ throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
+ s"location provided: $tableIdentWithDB")
}
val root = new Path(table.storage.locationUri.get)
@@ -659,7 +653,7 @@ case class AlterTableSetLocationCommand(
partitionSpec match {
case Some(spec) =>
// Partition spec is specified, so we set the location only for this partition
- val part = catalog.getPartition(tableName, spec)
+ val part = catalog.getPartition(table.identifier, spec)
val newPart =
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
@@ -668,7 +662,7 @@ case class AlterTableSetLocationCommand(
} else {
part.copy(storage = part.storage.copy(locationUri = Some(location)))
}
- catalog.alterPartitions(tableName, Seq(newPart))
+ catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
val newTable =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 60e6b5db62..94b46c5d97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
-import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -65,7 +64,11 @@ case class CreateTableLikeCommand(
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
}
- val sourceTableDesc = catalog.getTableMetadata(sourceTable)
+ val sourceTableDesc = if (sourceTable.database.isDefined) {
+ catalog.getTableMetadata(sourceTable)
+ } else {
+ catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
+ }
// Storage format
val newStorage =
@@ -158,14 +161,13 @@ case class AlterTableRenameCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- val table = catalog.getTableMetadata(oldName)
- DDLUtils.verifyAlterTableType(catalog, table, isView)
// If this is a temp view, just rename the view.
// Otherwise, if this is a real table, we also need to uncache and invalidate the table.
- val isTemporary = catalog.isTemporaryTable(oldName)
- if (isTemporary) {
+ if (catalog.isTemporaryTable(oldName)) {
catalog.renameTable(oldName, newName)
} else {
+ val table = catalog.getTableMetadata(oldName)
+ DDLUtils.verifyAlterTableType(catalog, table, isView)
val newTblName = TableIdentifier(newName, oldName.database)
// If an exception is thrown here we can just assume the table is uncached;
// this can happen with Hive tables when the underlying catalog is in-memory.
@@ -215,40 +217,38 @@ case class LoadDataCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- if (!catalog.tableExists(table)) {
- throw new AnalysisException(s"Target table in LOAD DATA does not exist: $table")
- }
- val targetTable = catalog.getTableMetadataOption(table).getOrElse {
- throw new AnalysisException(s"Target table in LOAD DATA cannot be temporary: $table")
- }
+ val targetTable = catalog.getTableMetadata(table)
+ val tableIdentwithDB = targetTable.identifier.quotedString
+
if (targetTable.tableType == CatalogTableType.VIEW) {
- throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $table")
+ throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $tableIdentwithDB")
}
if (DDLUtils.isDatasourceTable(targetTable)) {
- throw new AnalysisException(s"LOAD DATA is not supported for datasource tables: $table")
+ throw new AnalysisException(
+ s"LOAD DATA is not supported for datasource tables: $tableIdentwithDB")
}
if (targetTable.partitionColumnNames.nonEmpty) {
if (partition.isEmpty) {
- throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but no partition spec is provided")
}
if (targetTable.partitionColumnNames.size != partition.get.size) {
- throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but number of columns in provided partition spec (${partition.get.size}) " +
s"do not match number of partitioned columns in table " +
s"(s${targetTable.partitionColumnNames.size})")
}
partition.get.keys.foreach { colName =>
if (!targetTable.partitionColumnNames.contains(colName)) {
- throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but the specified partition spec refers to a column that is not partitioned: " +
s"'$colName'")
}
}
} else {
if (partition.nonEmpty) {
- throw new AnalysisException(s"LOAD DATA target table $table is not partitioned, " +
- s"but a partition spec was provided.")
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " +
+ s"partitioned, but a partition spec was provided.")
}
}
@@ -336,32 +336,27 @@ case class TruncateTableCommand(
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
- if (!catalog.tableExists(tableName)) {
- throw new AnalysisException(s"Table $tableName in TRUNCATE TABLE does not exist.")
- }
- if (catalog.isTemporaryTable(tableName)) {
- throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on temporary tables: $tableName")
- }
val table = catalog.getTableMetadata(tableName)
+ val tableIdentwithDB = table.identifier.quotedString
+
if (table.tableType == CatalogTableType.EXTERNAL) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on external tables: $tableName")
+ s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentwithDB")
}
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on views: $tableName")
+ s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB")
}
val isDatasourceTable = DDLUtils.isDatasourceTable(table)
if (isDatasourceTable && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables created using the data sources API: $tableName")
+ s"for tables created using the data sources API: $tableIdentwithDB")
}
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables that are not partitioned: $tableName")
+ s"for tables that are not partitioned: $tableIdentwithDB")
}
val locations =
if (isDatasourceTable) {
@@ -369,7 +364,7 @@ case class TruncateTableCommand(
} else if (table.partitionColumnNames.isEmpty) {
Seq(table.storage.locationUri)
} else {
- catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
+ catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri)
}
val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
@@ -382,7 +377,7 @@ case class TruncateTableCommand(
} catch {
case NonFatal(e) =>
throw new AnalysisException(
- s"Failed to truncate table $tableName when removing data of the path: $path " +
+ s"Failed to truncate table $tableIdentwithDB when removing data of the path: $path " +
s"because of ${e.toString}")
}
}
@@ -392,10 +387,10 @@ case class TruncateTableCommand(
spark.sessionState.refreshTable(tableName.unquotedString)
// Also try to drop the contents of the table from the columnar cache
try {
- spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
+ spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier))
} catch {
case NonFatal(e) =>
- log.warn(s"Exception when attempting to uncache table $tableName", e)
+ log.warn(s"Exception when attempting to uncache table $tableIdentwithDB", e)
}
Seq.empty[Row]
}
@@ -600,13 +595,19 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio
* SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
* }}}
*/
-case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
+case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("col_name", StringType, nullable = false)() :: Nil
}
override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
+ val catalog = sparkSession.sessionState.catalog
+ val table = if (tableName.database.isDefined) {
+ catalog.getTableMetadata(tableName)
+ } else {
+ catalog.getTempViewOrPermanentTableMetadata(tableName.table)
+ }
+ table.schema.map { c =>
Row(c.name)
}
}
@@ -628,7 +629,7 @@ case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
* }}}
*/
case class ShowPartitionsCommand(
- table: TableIdentifier,
+ tableName: TableIdentifier,
spec: Option[TablePartitionSpec]) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("partition", StringType, nullable = false)() :: Nil
@@ -642,13 +643,8 @@ case class ShowPartitionsCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
-
- if (catalog.isTemporaryTable(table)) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a temporary table: ${table.unquotedString}")
- }
-
- val tab = catalog.getTableMetadata(table)
+ val table = catalog.getTableMetadata(tableName)
+ val tableIdentWithDB = table.identifier.quotedString
/**
* Validate and throws an [[AnalysisException]] exception under the following conditions:
@@ -656,19 +652,18 @@ case class ShowPartitionsCommand(
* 2. If it is a datasource table.
* 3. If it is a view.
*/
- if (tab.tableType == VIEW) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a view: ${tab.qualifiedName}")
+ if (table.tableType == VIEW) {
+ throw new AnalysisException(s"SHOW PARTITIONS is not allowed on a view: $tableIdentWithDB")
}
- if (tab.partitionColumnNames.isEmpty) {
+ if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}")
+ s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
}
- if (DDLUtils.isDatasourceTable(tab)) {
+ if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a datasource table: ${tab.qualifiedName}")
+ s"SHOW PARTITIONS is not allowed on a datasource table: $tableIdentWithDB")
}
/**
@@ -677,7 +672,7 @@ case class ShowPartitionsCommand(
* thrown if the partitioning spec is invalid.
*/
if (spec.isDefined) {
- val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains)
+ val badColumns = spec.get.keySet.filterNot(table.partitionColumnNames.contains)
if (badColumns.nonEmpty) {
val badCols = badColumns.mkString("[", ", ", "]")
throw new AnalysisException(
@@ -685,8 +680,8 @@ case class ShowPartitionsCommand(
}
}
- val partNames = catalog.listPartitions(table, spec).map { p =>
- getPartName(p.spec, tab.partitionColumnNames)
+ val partNames = catalog.listPartitions(tableName, spec).map { p =>
+ getPartName(p.spec, table.partitionColumnNames)
}
partNames.map(Row(_))
@@ -700,16 +695,6 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
-
- if (catalog.isTemporaryTable(table)) {
- throw new AnalysisException(
- s"SHOW CREATE TABLE cannot be applied to temporary table")
- }
-
- if (!catalog.tableExists(table)) {
- throw new AnalysisException(s"Table $table doesn't exist")
- }
-
val tableMetadata = catalog.getTableMetadata(table)
// TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 3fa6298562..6fecda232a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -151,7 +151,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
- val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
+ val tableMetadata = if (tableIdentifier.database.isDefined) {
+ sessionCatalog.getTableMetadata(tableIdentifier)
+ } else {
+ sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table)
+ }
+
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
val columns = tableMetadata.schema.map { c =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 4a171808c0..b5499f2884 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1646,7 +1646,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
(1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab")
sql(s"CREATE EXTERNAL TABLE my_ext_tab LOCATION '$path'")
sql(s"CREATE VIEW my_view AS SELECT 1")
- assertUnsupported("TRUNCATE TABLE my_temp_tab")
+ intercept[NoSuchTableException] {
+ sql("TRUNCATE TABLE my_temp_tab")
+ }
assertUnsupported("TRUNCATE TABLE my_ext_tab")
assertUnsupported("TRUNCATE TABLE my_view")
}