aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-09-20 20:11:48 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-20 20:11:48 +0800
commitd5ec5dbb0dc0358b0394626c80781e422f9af581 (patch)
tree73d9bb2105d441cffae360fd8450a5ae180d15ec
parent4a426ff8aea4faa31a3016a453dec5b7954578dd (diff)
downloadspark-d5ec5dbb0dc0358b0394626c80781e422f9af581.tar.gz
spark-d5ec5dbb0dc0358b0394626c80781e422f9af581.tar.bz2
spark-d5ec5dbb0dc0358b0394626c80781e422f9af581.zip
[SPARK-17502][SQL] Fix Multiple Bugs in DDL Statements on Temporary Views
### What changes were proposed in this pull request? - When the permanent tables/views do not exist but the temporary view exists, the expected error should be `NoSuchTableException` for partition-related ALTER TABLE commands. However, it always reports a confusing error message. For example, ``` Partition spec is invalid. The spec (a, b) must match the partition spec () defined in table '`testview`'; ``` - When the permanent tables/views do not exist but the temporary view exists, the expected error should be `NoSuchTableException` for `ALTER TABLE ... UNSET TBLPROPERTIES`. However, it reports a missing table property. For example, ``` Attempted to unset non-existent property 'p' in table '`testView`'; ``` - When `ANALYZE TABLE` is called on a view or a temporary view, we should issue an error message. However, it reports a strange error: ``` ANALYZE TABLE is not supported for Project ``` - When inserting into a temporary view that is generated from `Range`, we will get the following error message: ``` assertion failed: No plan for 'InsertIntoTable Range (0, 10, step=1, splits=Some(1)), false, false +- Project [1 AS 1#20] +- OneRowRelation$ ``` This PR is to fix the above four issues. ### How was this patch tested? Added multiple test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #15054 from gatorsmile/tempViewDDL.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala53
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala113
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala63
11 files changed, 164 insertions, 156 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index e07e9194be..9c06069f24 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -360,6 +360,7 @@ trait CheckAnalysis extends PredicateHelper {
case InsertIntoTable(t, _, _, _, _)
if !t.isInstanceOf[LeafNode] ||
+ t.isInstanceOf[Range] ||
t == OneRowRelation ||
t.isInstanceOf[LocalRelation] =>
failAnalysis(s"Inserting into an RDD-based table is not allowed.")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 574c3d7eee..ef29c75c01 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -246,27 +246,16 @@ class SessionCatalog(
}
/**
- * Retrieve the metadata of an existing metastore table.
- * If no database is specified, assume the table is in the current database.
- * If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
+ * Retrieve the metadata of an existing permanent table/view. If no database is specified,
+ * assume the table/view is in the current database. If the specified table/view is not found
+ * in the database then a [[NoSuchTableException]] is thrown.
*/
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
- val tid = TableIdentifier(table)
- if (isTemporaryTable(name)) {
- CatalogTable(
- identifier = tid,
- tableType = CatalogTableType.VIEW,
- storage = CatalogStorageFormat.empty,
- schema = tempTables(table).output.toStructType,
- properties = Map(),
- viewText = None)
- } else {
- requireDbExists(db)
- requireTableExists(TableIdentifier(table, Some(db)))
- externalCatalog.getTable(db, table)
- }
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Some(db)))
+ externalCatalog.getTable(db, table)
}
/**
@@ -282,6 +271,24 @@ class SessionCatalog(
}
/**
+ * Retrieve the metadata of an existing temporary view or permanent table/view.
+ * If the temporary view does not exist, tries to get the metadata an existing permanent
+ * table/view. If no database is specified, assume the table/view is in the current database.
+ * If the specified table/view is not found in the database then a [[NoSuchTableException]] is
+ * thrown.
+ */
+ def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = synchronized {
+ val table = formatTableName(name)
+ getTempView(table).map { plan =>
+ CatalogTable(
+ identifier = TableIdentifier(table),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = plan.output.toStructType)
+ }.getOrElse(getTableMetadata(TableIdentifier(name)))
+ }
+
+ /**
* Load files stored in given path into an existing metastore table.
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
@@ -530,11 +537,11 @@ class SessionCatalog(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
- requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
+ requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}
@@ -547,11 +554,11 @@ class SessionCatalog(
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
- requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
+ requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
}
@@ -566,12 +573,12 @@ class SessionCatalog(
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
val tableMetadata = getTableMetadata(tableName)
- requireExactMatchedPartitionSpec(specs, tableMetadata)
- requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
+ requireExactMatchedPartitionSpec(specs, tableMetadata)
+ requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}
@@ -585,11 +592,11 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
- requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
+ requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
externalCatalog.alterPartitions(db, table, parts)
}
@@ -598,11 +605,11 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
- requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
+ requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
externalCatalog.getPartition(db, table, spec)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 84b77ad250..384a730861 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -444,27 +444,16 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
}
- test("getTableMetadata on temporary views") {
+ test("getTempViewOrPermanentTableMetadata on temporary views") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
- val m = intercept[AnalysisException] {
- catalog.getTableMetadata(TableIdentifier("view1"))
- }.getMessage
- assert(m.contains("Table or view 'view1' not found in database 'default'"))
-
- val m2 = intercept[AnalysisException] {
- catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
+ intercept[NoSuchTableException] {
+ catalog.getTempViewOrPermanentTableMetadata("view1")
}.getMessage
- assert(m2.contains("Table or view 'view1' not found in database 'default'"))
catalog.createTempView("view1", tempTable, overrideIfExists = false)
- assert(catalog.getTableMetadata(TableIdentifier("view1")).identifier.table == "view1")
- assert(catalog.getTableMetadata(TableIdentifier("view1")).schema(0).name == "id")
-
- val m3 = intercept[AnalysisException] {
- catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
- }.getMessage
- assert(m3.contains("Table or view 'view1' not found in database 'default'"))
+ assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier ==
+ TableIdentifier("view1"), "the temporary view `view1` should exist")
}
test("list tables without pattern") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 15687ddd72..40aecafecf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -22,6 +22,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
@@ -37,7 +38,9 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend
override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+ val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val tableIdentwithDB = TableIdentifier(tableIdent.table, Some(db))
+ val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentwithDB))
relation match {
case relation: CatalogRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index c0ccdca98e..b57b2d280d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -264,7 +264,7 @@ case class AlterTableUnsetPropertiesCommand(
propKeys.foreach { k =>
if (!table.properties.contains(k)) {
throw new AnalysisException(
- s"Attempted to unset non-existent property '$k' in table '$tableName'")
+ s"Attempted to unset non-existent property '$k' in table '${table.identifier}'")
}
}
}
@@ -317,11 +317,11 @@ case class AlterTableSerDePropertiesCommand(
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
- val part = catalog.getPartition(tableName, spec)
+ val part = catalog.getPartition(table.identifier, spec)
val newPart = part.copy(storage = part.storage.copy(
serde = serdeClassName.orElse(part.storage.serde),
properties = part.storage.properties ++ serdeProperties.getOrElse(Map())))
- catalog.alterPartitions(tableName, Seq(newPart))
+ catalog.alterPartitions(table.identifier, Seq(newPart))
}
Seq.empty[Row]
}
@@ -358,7 +358,7 @@ case class AlterTableAddPartitionCommand(
// inherit table storage format (possibly except for location)
CatalogTablePartition(spec, table.storage.copy(locationUri = location))
}
- catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists)
+ catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
Seq.empty[Row]
}
@@ -422,7 +422,7 @@ case class AlterTableDropPartitionCommand(
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
}
- catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge)
+ catalog.dropPartitions(table.identifier, specs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}
@@ -471,26 +471,20 @@ case class AlterTableRecoverPartitionsCommand(
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
- if (!catalog.tableExists(tableName)) {
- throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
- }
- if (catalog.isTemporaryTable(tableName)) {
- throw new AnalysisException(
- s"Operation not allowed: $cmd on temporary tables: $tableName")
- }
val table = catalog.getTableMetadata(tableName)
+ val tableIdentWithDB = table.identifier.quotedString
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
- s"Operation not allowed: $cmd on datasource tables: $tableName")
+ s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB")
}
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
- s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
+ s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
}
if (table.storage.locationUri.isEmpty) {
- throw new AnalysisException(
- s"Operation not allowed: $cmd only works on table with location provided: $tableName")
+ throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
+ s"location provided: $tableIdentWithDB")
}
val root = new Path(table.storage.locationUri.get)
@@ -659,7 +653,7 @@ case class AlterTableSetLocationCommand(
partitionSpec match {
case Some(spec) =>
// Partition spec is specified, so we set the location only for this partition
- val part = catalog.getPartition(tableName, spec)
+ val part = catalog.getPartition(table.identifier, spec)
val newPart =
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
@@ -668,7 +662,7 @@ case class AlterTableSetLocationCommand(
} else {
part.copy(storage = part.storage.copy(locationUri = Some(location)))
}
- catalog.alterPartitions(tableName, Seq(newPart))
+ catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
val newTable =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 60e6b5db62..94b46c5d97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
-import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -65,7 +64,11 @@ case class CreateTableLikeCommand(
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
}
- val sourceTableDesc = catalog.getTableMetadata(sourceTable)
+ val sourceTableDesc = if (sourceTable.database.isDefined) {
+ catalog.getTableMetadata(sourceTable)
+ } else {
+ catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
+ }
// Storage format
val newStorage =
@@ -158,14 +161,13 @@ case class AlterTableRenameCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- val table = catalog.getTableMetadata(oldName)
- DDLUtils.verifyAlterTableType(catalog, table, isView)
// If this is a temp view, just rename the view.
// Otherwise, if this is a real table, we also need to uncache and invalidate the table.
- val isTemporary = catalog.isTemporaryTable(oldName)
- if (isTemporary) {
+ if (catalog.isTemporaryTable(oldName)) {
catalog.renameTable(oldName, newName)
} else {
+ val table = catalog.getTableMetadata(oldName)
+ DDLUtils.verifyAlterTableType(catalog, table, isView)
val newTblName = TableIdentifier(newName, oldName.database)
// If an exception is thrown here we can just assume the table is uncached;
// this can happen with Hive tables when the underlying catalog is in-memory.
@@ -215,40 +217,38 @@ case class LoadDataCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- if (!catalog.tableExists(table)) {
- throw new AnalysisException(s"Target table in LOAD DATA does not exist: $table")
- }
- val targetTable = catalog.getTableMetadataOption(table).getOrElse {
- throw new AnalysisException(s"Target table in LOAD DATA cannot be temporary: $table")
- }
+ val targetTable = catalog.getTableMetadata(table)
+ val tableIdentwithDB = targetTable.identifier.quotedString
+
if (targetTable.tableType == CatalogTableType.VIEW) {
- throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $table")
+ throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $tableIdentwithDB")
}
if (DDLUtils.isDatasourceTable(targetTable)) {
- throw new AnalysisException(s"LOAD DATA is not supported for datasource tables: $table")
+ throw new AnalysisException(
+ s"LOAD DATA is not supported for datasource tables: $tableIdentwithDB")
}
if (targetTable.partitionColumnNames.nonEmpty) {
if (partition.isEmpty) {
- throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but no partition spec is provided")
}
if (targetTable.partitionColumnNames.size != partition.get.size) {
- throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but number of columns in provided partition spec (${partition.get.size}) " +
s"do not match number of partitioned columns in table " +
s"(s${targetTable.partitionColumnNames.size})")
}
partition.get.keys.foreach { colName =>
if (!targetTable.partitionColumnNames.contains(colName)) {
- throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but the specified partition spec refers to a column that is not partitioned: " +
s"'$colName'")
}
}
} else {
if (partition.nonEmpty) {
- throw new AnalysisException(s"LOAD DATA target table $table is not partitioned, " +
- s"but a partition spec was provided.")
+ throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " +
+ s"partitioned, but a partition spec was provided.")
}
}
@@ -336,32 +336,27 @@ case class TruncateTableCommand(
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
- if (!catalog.tableExists(tableName)) {
- throw new AnalysisException(s"Table $tableName in TRUNCATE TABLE does not exist.")
- }
- if (catalog.isTemporaryTable(tableName)) {
- throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on temporary tables: $tableName")
- }
val table = catalog.getTableMetadata(tableName)
+ val tableIdentwithDB = table.identifier.quotedString
+
if (table.tableType == CatalogTableType.EXTERNAL) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on external tables: $tableName")
+ s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentwithDB")
}
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on views: $tableName")
+ s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB")
}
val isDatasourceTable = DDLUtils.isDatasourceTable(table)
if (isDatasourceTable && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables created using the data sources API: $tableName")
+ s"for tables created using the data sources API: $tableIdentwithDB")
}
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables that are not partitioned: $tableName")
+ s"for tables that are not partitioned: $tableIdentwithDB")
}
val locations =
if (isDatasourceTable) {
@@ -369,7 +364,7 @@ case class TruncateTableCommand(
} else if (table.partitionColumnNames.isEmpty) {
Seq(table.storage.locationUri)
} else {
- catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
+ catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri)
}
val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
@@ -382,7 +377,7 @@ case class TruncateTableCommand(
} catch {
case NonFatal(e) =>
throw new AnalysisException(
- s"Failed to truncate table $tableName when removing data of the path: $path " +
+ s"Failed to truncate table $tableIdentwithDB when removing data of the path: $path " +
s"because of ${e.toString}")
}
}
@@ -392,10 +387,10 @@ case class TruncateTableCommand(
spark.sessionState.refreshTable(tableName.unquotedString)
// Also try to drop the contents of the table from the columnar cache
try {
- spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
+ spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier))
} catch {
case NonFatal(e) =>
- log.warn(s"Exception when attempting to uncache table $tableName", e)
+ log.warn(s"Exception when attempting to uncache table $tableIdentwithDB", e)
}
Seq.empty[Row]
}
@@ -600,13 +595,19 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio
* SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
* }}}
*/
-case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
+case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("col_name", StringType, nullable = false)() :: Nil
}
override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
+ val catalog = sparkSession.sessionState.catalog
+ val table = if (tableName.database.isDefined) {
+ catalog.getTableMetadata(tableName)
+ } else {
+ catalog.getTempViewOrPermanentTableMetadata(tableName.table)
+ }
+ table.schema.map { c =>
Row(c.name)
}
}
@@ -628,7 +629,7 @@ case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
* }}}
*/
case class ShowPartitionsCommand(
- table: TableIdentifier,
+ tableName: TableIdentifier,
spec: Option[TablePartitionSpec]) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("partition", StringType, nullable = false)() :: Nil
@@ -642,13 +643,8 @@ case class ShowPartitionsCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
-
- if (catalog.isTemporaryTable(table)) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a temporary table: ${table.unquotedString}")
- }
-
- val tab = catalog.getTableMetadata(table)
+ val table = catalog.getTableMetadata(tableName)
+ val tableIdentWithDB = table.identifier.quotedString
/**
* Validate and throws an [[AnalysisException]] exception under the following conditions:
@@ -656,19 +652,18 @@ case class ShowPartitionsCommand(
* 2. If it is a datasource table.
* 3. If it is a view.
*/
- if (tab.tableType == VIEW) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a view: ${tab.qualifiedName}")
+ if (table.tableType == VIEW) {
+ throw new AnalysisException(s"SHOW PARTITIONS is not allowed on a view: $tableIdentWithDB")
}
- if (tab.partitionColumnNames.isEmpty) {
+ if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}")
+ s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
}
- if (DDLUtils.isDatasourceTable(tab)) {
+ if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a datasource table: ${tab.qualifiedName}")
+ s"SHOW PARTITIONS is not allowed on a datasource table: $tableIdentWithDB")
}
/**
@@ -677,7 +672,7 @@ case class ShowPartitionsCommand(
* thrown if the partitioning spec is invalid.
*/
if (spec.isDefined) {
- val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains)
+ val badColumns = spec.get.keySet.filterNot(table.partitionColumnNames.contains)
if (badColumns.nonEmpty) {
val badCols = badColumns.mkString("[", ", ", "]")
throw new AnalysisException(
@@ -685,8 +680,8 @@ case class ShowPartitionsCommand(
}
}
- val partNames = catalog.listPartitions(table, spec).map { p =>
- getPartName(p.spec, tab.partitionColumnNames)
+ val partNames = catalog.listPartitions(tableName, spec).map { p =>
+ getPartName(p.spec, table.partitionColumnNames)
}
partNames.map(Row(_))
@@ -700,16 +695,6 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
-
- if (catalog.isTemporaryTable(table)) {
- throw new AnalysisException(
- s"SHOW CREATE TABLE cannot be applied to temporary table")
- }
-
- if (!catalog.tableExists(table)) {
- throw new AnalysisException(s"Table $table doesn't exist")
- }
-
val tableMetadata = catalog.getTableMetadata(table)
// TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 3fa6298562..6fecda232a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -151,7 +151,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
- val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
+ val tableMetadata = if (tableIdentifier.database.isDefined) {
+ sessionCatalog.getTableMetadata(tableIdentifier)
+ } else {
+ sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table)
+ }
+
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
val columns = tableMetadata.schema.map { c =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 4a171808c0..b5499f2884 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1646,7 +1646,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
(1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab")
sql(s"CREATE EXTERNAL TABLE my_ext_tab LOCATION '$path'")
sql(s"CREATE VIEW my_view AS SELECT 1")
- assertUnsupported("TRUNCATE TABLE my_temp_tab")
+ intercept[NoSuchTableException] {
+ sql("TRUNCATE TABLE my_temp_tab")
+ }
assertUnsupported("TRUNCATE TABLE my_ext_tab")
assertUnsupported("TRUNCATE TABLE my_view")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index df33731df2..b2103b3bfc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -406,25 +406,24 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
|USING org.apache.spark.sql.parquet.DefaultSource
""".stripMargin)
// An empty sequence of row is returned for session temporary table.
- val message1 = intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
sql("SHOW PARTITIONS parquet_temp")
- }.getMessage
- assert(message1.contains("is not allowed on a temporary table"))
+ }
- val message2 = intercept[AnalysisException] {
+ val message1 = intercept[AnalysisException] {
sql("SHOW PARTITIONS parquet_tab3")
}.getMessage
- assert(message2.contains("not allowed on a table that is not partitioned"))
+ assert(message1.contains("not allowed on a table that is not partitioned"))
- val message3 = intercept[AnalysisException] {
+ val message2 = intercept[AnalysisException] {
sql("SHOW PARTITIONS parquet_tab4 PARTITION(abcd=2015, xyz=1)")
}.getMessage
- assert(message3.contains("Non-partitioning column(s) [abcd, xyz] are specified"))
+ assert(message2.contains("Non-partitioning column(s) [abcd, xyz] are specified"))
- val message4 = intercept[AnalysisException] {
+ val message3 = intercept[AnalysisException] {
sql("SHOW PARTITIONS parquet_view1")
}.getMessage
- assert(message4.contains("is not allowed on a view"))
+ assert(message3.contains("is not allowed on a view"))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index aa35a335fa..38482f66a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -300,7 +300,7 @@ class HiveDDLSuite
sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
}.getMessage
assert(message.contains(
- "Attempted to unset non-existent property 'p' in table '`view1`'"))
+ "Attempted to unset non-existent property 'p' in table '`default`.`view1`'"))
}
}
}
@@ -678,8 +678,8 @@ class HiveDDLSuite
.createTempView(sourceViewName)
sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
- val sourceTable = spark.sessionState.catalog.getTableMetadata(
- TableIdentifier(sourceViewName, None))
+ val sourceTable =
+ spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(sourceViewName)
val targetTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(targetTabName, Some("default")))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index bc999d4724..a215c70da0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -82,25 +82,53 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test("error handling: insert/load/truncate table commands against a temp view") {
+ test("Issue exceptions for ALTER VIEW on the temporary view") {
val viewName = "testView"
withTempView(viewName) {
- sql(s"CREATE TEMPORARY VIEW $viewName AS SELECT id FROM jt")
- var e = intercept[AnalysisException] {
+ spark.range(10).createTempView(viewName)
+ assertNoSuchTable(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')")
+ assertNoSuchTable(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
+ }
+ }
+
+ test("Issue exceptions for ALTER TABLE on the temporary view") {
+ val viewName = "testView"
+ withTempView(viewName) {
+ spark.range(10).createTempView(viewName)
+ assertNoSuchTable(s"ALTER TABLE $viewName SET SERDE 'whatever'")
+ assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a=1, b=2) SET SERDE 'whatever'")
+ assertNoSuchTable(s"ALTER TABLE $viewName SET SERDEPROPERTIES ('p' = 'an')")
+ assertNoSuchTable(s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'")
+ assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'")
+ assertNoSuchTable(s"ALTER TABLE $viewName ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+ assertNoSuchTable(s"ALTER TABLE $viewName DROP PARTITION (a='4', b='8')")
+ assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')")
+ assertNoSuchTable(s"ALTER TABLE $viewName RECOVER PARTITIONS")
+ }
+ }
+
+ test("Issue exceptions for other table DDL on the temporary view") {
+ val viewName = "testView"
+ withTempView(viewName) {
+ spark.range(10).createTempView(viewName)
+
+ val e = intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $viewName SELECT 1")
}.getMessage
assert(e.contains("Inserting into an RDD-based table is not allowed"))
val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
- e = intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
- }.getMessage
- assert(e.contains(s"Target table in LOAD DATA cannot be temporary: `$viewName`"))
+ assertNoSuchTable(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
+ assertNoSuchTable(s"TRUNCATE TABLE $viewName")
+ assertNoSuchTable(s"SHOW CREATE TABLE $viewName")
+ assertNoSuchTable(s"SHOW PARTITIONS $viewName")
+ assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
+ }
+ }
- e = intercept[AnalysisException] {
- sql(s"TRUNCATE TABLE $viewName")
- }.getMessage
- assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on temporary tables: `$viewName`"))
+ private def assertNoSuchTable(query: String): Unit = {
+ intercept[NoSuchTableException] {
+ sql(query)
}
}
@@ -117,12 +145,12 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
e = intercept[AnalysisException] {
sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
}.getMessage
- assert(e.contains(s"Target table in LOAD DATA cannot be a view: `$viewName`"))
+ assert(e.contains(s"Target table in LOAD DATA cannot be a view: `default`.`testview`"))
e = intercept[AnalysisException] {
sql(s"TRUNCATE TABLE $viewName")
}.getMessage
- assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `$viewName`"))
+ assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `default`.`testview`"))
}
}
@@ -277,13 +305,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("should not allow ALTER VIEW AS when the view does not exist") {
- intercept[NoSuchTableException](
- sql("ALTER VIEW testView AS SELECT 1, 2")
- )
-
- intercept[NoSuchTableException](
- sql("ALTER VIEW default.testView AS SELECT 1, 2")
- )
+ assertNoSuchTable("ALTER VIEW testView AS SELECT 1, 2")
+ assertNoSuchTable("ALTER VIEW default.testView AS SELECT 1, 2")
}
test("ALTER VIEW AS should try to alter temp view first if view name has no database part") {