From b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 22 Sep 2016 12:52:09 +0800 Subject: [SPARK-17609][SQL] SessionCatalog.tableExists should not check temp view ## What changes were proposed in this pull request? After #15054 , there is no place in Spark SQL that need `SessionCatalog.tableExists` to check temp views, so this PR makes `SessionCatalog.tableExists` only check permanent table/view and removes some hacks. This PR also improves the `getTempViewOrPermanentTableMetadata` that is introduced in #15054 , to make the code simpler. ## How was this patch tested? existing tests Author: Wenchen Fan Closes #15160 from cloud-fan/exists. --- .../sql/catalyst/catalog/SessionCatalog.scala | 70 +++++++++++----------- .../sql/catalyst/catalog/SessionCatalogSuite.scala | 30 +++++----- .../org/apache/spark/sql/DataFrameWriter.scala | 9 +-- .../execution/command/createDataSourceTables.scala | 15 ++--- .../apache/spark/sql/execution/command/ddl.scala | 43 ++++++------- .../spark/sql/execution/command/tables.scala | 17 +----- .../apache/spark/sql/internal/CatalogImpl.scala | 6 +- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 4 +- 9 files changed, 81 insertions(+), 115 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ef29c75c01..8c01c7a3f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -245,6 +245,16 @@ class SessionCatalog( externalCatalog.alterTable(newTableDefinition) } + /** + * Return whether a table/view with the specified name exists. If no database is specified, check + * with current database. + */ + def tableExists(name: TableIdentifier): Boolean = synchronized { + val db = formatDatabaseName(name.database.getOrElse(currentDb)) + val table = formatTableName(name.table) + externalCatalog.tableExists(db, table) + } + /** * Retrieve the metadata of an existing permanent table/view. If no database is specified, * assume the table/view is in the current database. If the specified table/view is not found @@ -270,24 +280,6 @@ class SessionCatalog( externalCatalog.getTableOption(db, table) } - /** - * Retrieve the metadata of an existing temporary view or permanent table/view. - * If the temporary view does not exist, tries to get the metadata an existing permanent - * table/view. If no database is specified, assume the table/view is in the current database. - * If the specified table/view is not found in the database then a [[NoSuchTableException]] is - * thrown. - */ - def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = synchronized { - val table = formatTableName(name) - getTempView(table).map { plan => - CatalogTable( - identifier = TableIdentifier(table), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = plan.output.toStructType) - }.getOrElse(getTableMetadata(TableIdentifier(name))) - } - /** * Load files stored in given path into an existing metastore table. * If no database is specified, assume the table is in the current database. @@ -368,6 +360,30 @@ class SessionCatalog( // | Methods that interact with temporary and metastore tables | // ------------------------------------------------------------- + /** + * Retrieve the metadata of an existing temporary view or permanent table/view. + * + * If a database is specified in `name`, this will return the metadata of table/view in that + * database. + * If no database is specified, this will first attempt to get the metadata of a temporary view + * with the same name, then, if that does not exist, return the metadata of table/view in the + * current database. + */ + def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized { + val table = formatTableName(name.table) + if (name.database.isDefined) { + getTableMetadata(name) + } else { + getTempView(table).map { plan => + CatalogTable( + identifier = TableIdentifier(table), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = plan.output.toStructType) + }.getOrElse(getTableMetadata(name)) + } + } + /** * Rename a table. * @@ -449,24 +465,6 @@ class SessionCatalog( } } - /** - * Return whether a table/view with the specified name exists. - * - * Note: If a database is explicitly specified, then this will return whether the table/view - * exists in that particular database instead. In that case, even if there is a temporary - * table with the same name, we will return false if the specified database does not - * contain the table/view. - */ - def tableExists(name: TableIdentifier): Boolean = synchronized { - val db = formatDatabaseName(name.database.getOrElse(currentDb)) - val table = formatTableName(name.table) - if (isTemporaryTable(name)) { - true - } else { - externalCatalog.tableExists(db, table) - } - } - /** * Return whether a table with the specified name is a temporary table. * diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 384a730861..915ed8f8b1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -425,35 +425,37 @@ class SessionCatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) // If database is explicitly specified, do not check temporary tables val tempTable = Range(1, 10, 1, 10) - catalog.createTempView("tbl3", tempTable, overrideIfExists = false) assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) // If database is not explicitly specified, check the current database catalog.setCurrentDatabase("db2") assert(catalog.tableExists(TableIdentifier("tbl1"))) assert(catalog.tableExists(TableIdentifier("tbl2"))) - assert(catalog.tableExists(TableIdentifier("tbl3"))) - } - test("tableExists on temporary views") { - val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10) - assert(!catalog.tableExists(TableIdentifier("view1"))) - assert(!catalog.tableExists(TableIdentifier("view1", Some("default")))) - catalog.createTempView("view1", tempTable, overrideIfExists = false) - assert(catalog.tableExists(TableIdentifier("view1"))) - assert(!catalog.tableExists(TableIdentifier("view1", Some("default")))) + catalog.createTempView("tbl3", tempTable, overrideIfExists = false) + // tableExists should not check temp view. + assert(!catalog.tableExists(TableIdentifier("tbl3"))) } test("getTempViewOrPermanentTableMetadata on temporary views") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10) intercept[NoSuchTableException] { - catalog.getTempViewOrPermanentTableMetadata("view1") + catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1")) + }.getMessage + + intercept[NoSuchTableException] { + catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default"))) }.getMessage catalog.createTempView("view1", tempTable, overrideIfExists = false) - assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier == - TableIdentifier("view1"), "the temporary view `view1` should exist") + assert(catalog.getTempViewOrPermanentTableMetadata( + TableIdentifier("view1")).identifier.table == "view1") + assert(catalog.getTempViewOrPermanentTableMetadata( + TableIdentifier("view1")).schema(0).name == "id") + + intercept[NoSuchTableException] { + catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default"))) + }.getMessage } test("list tables without pattern") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9e343b5d24..64d3422cb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -361,12 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException("Cannot create hive serde table with saveAsTable API") } - val sessionState = df.sparkSession.sessionState - val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) - val tableIdentWithDB = tableIdent.copy(database = Some(db)) - // Pass a table identifier with database part, so that `tableExists` won't check temp views - // unexpectedly. - val tableExists = sessionState.catalog.tableExists(tableIdentWithDB) + val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) (tableExists, mode) match { case (true, SaveMode.Ignore) => @@ -392,7 +387,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { bucketSpec = getBucketSpec ) val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) - sessionState.executePlan(cmd).toRdd + df.sparkSession.sessionState.executePlan(cmd).toRdd } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index d8e20b09c1..a04a13e698 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -47,15 +47,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo assert(table.provider.isDefined) val sessionState = sparkSession.sessionState - val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) - val tableIdentWithDB = table.identifier.copy(database = Some(db)) - // Pass a table identifier with database part, so that `tableExists` won't check temp views - // unexpectedly. - if (sessionState.catalog.tableExists(tableIdentWithDB)) { + if (sessionState.catalog.tableExists(table.identifier)) { if (ignoreIfExists) { return Seq.empty[Row] } else { - throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.") + throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") } } @@ -146,8 +142,6 @@ case class CreateDataSourceTableAsSelectCommand( var createMetastoreTable = false var existingSchema = Option.empty[StructType] - // Pass a table identifier with database part, so that `tableExists` won't check temp views - // unexpectedly. if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { // Check if we need to throw an exception or just return. mode match { @@ -172,8 +166,9 @@ case class CreateDataSourceTableAsSelectCommand( // TODO: Check that options from the resolved relation match the relation that we are // inserting into (i.e. using the same compression). - EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(tableIdentWithDB)) match { + // Pass a table identifier with database part, so that `lookupRelation` won't get temp + // views unexpectedly. + EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => // check if the file formats match l.relation match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index b57b2d280d..01ac89868d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -183,32 +183,25 @@ case class DropTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (!catalog.tableExists(tableName)) { - if (!ifExists) { - val objectName = if (isView) "View" else "Table" - throw new AnalysisException(s"$objectName to drop '$tableName' does not exist") - } - } else { - // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view - // issue an exception. - catalog.getTableMetadataOption(tableName).map(_.tableType match { - case CatalogTableType.VIEW if !isView => - throw new AnalysisException( - "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") - case o if o != CatalogTableType.VIEW && isView => - throw new AnalysisException( - s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") - case _ => - }) - try { - sparkSession.sharedState.cacheManager.uncacheQuery( - sparkSession.table(tableName.quotedString)) - } catch { - case NonFatal(e) => log.warn(e.toString, e) - } - catalog.refreshTable(tableName) - catalog.dropTable(tableName, ifExists, purge) + // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view + // issue an exception. + catalog.getTableMetadataOption(tableName).map(_.tableType match { + case CatalogTableType.VIEW if !isView => + throw new AnalysisException( + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") + case o if o != CatalogTableType.VIEW && isView => + throw new AnalysisException( + s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") + case _ => + }) + try { + sparkSession.sharedState.cacheManager.uncacheQuery( + sparkSession.table(tableName.quotedString)) + } catch { + case NonFatal(e) => log.warn(e.toString, e) } + catalog.refreshTable(tableName) + catalog.dropTable(tableName, ifExists, purge) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 94b46c5d97..0f61629317 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -59,16 +59,7 @@ case class CreateTableLikeCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (!catalog.tableExists(sourceTable)) { - throw new AnalysisException( - s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'") - } - - val sourceTableDesc = if (sourceTable.database.isDefined) { - catalog.getTableMetadata(sourceTable) - } else { - catalog.getTempViewOrPermanentTableMetadata(sourceTable.table) - } + val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable) // Storage format val newStorage = @@ -602,11 +593,7 @@ case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableComman override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val table = if (tableName.database.isDefined) { - catalog.getTableMetadata(tableName) - } else { - catalog.getTempViewOrPermanentTableMetadata(tableName.table) - } + val table = catalog.getTempViewOrPermanentTableMetadata(tableName) table.schema.map { c => Row(c.name) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 6fecda232a..f252535765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -151,11 +151,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = { - val tableMetadata = if (tableIdentifier.database.isDefined) { - sessionCatalog.getTableMetadata(tableIdentifier) - } else { - sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table) - } + val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier) val partitionColumnNames = tableMetadata.partitionColumnNames.toSet val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 7143adf02b..8ae6868c98 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -515,7 +515,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv assert( intercept[AnalysisException] { sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString) - }.getMessage.contains("Table default.createdJsonTable already exists."), + }.getMessage.contains("Table createdJsonTable already exists."), "We should complain that createdJsonTable already exists") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 38482f66a3..c927e5d802 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -678,8 +678,8 @@ class HiveDDLSuite .createTempView(sourceViewName) sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName") - val sourceTable = - spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(sourceViewName) + val sourceTable = spark.sessionState.catalog.getTempViewOrPermanentTableMetadata( + TableIdentifier(sourceViewName)) val targetTable = spark.sessionState.catalog.getTableMetadata( TableIdentifier(targetTabName, Some("default"))) -- cgit v1.2.3