diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-09-22 12:52:09 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-09-22 12:52:09 +0800 |
commit | b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0 (patch) | |
tree | 1097e32b1e9aac12f7166dae9d5cb88f363abad2 /sql/core | |
parent | 8bde03bf9a0896ea59ceaa699df7700351a130fb (diff) | |
download | spark-b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0.tar.gz spark-b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0.tar.bz2 spark-b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0.zip |
[SPARK-17609][SQL] SessionCatalog.tableExists should not check temp view
## What changes were proposed in this pull request?
After #15054 , there is no place in Spark SQL that need `SessionCatalog.tableExists` to check temp views, so this PR makes `SessionCatalog.tableExists` only check permanent table/view and removes some hacks.
This PR also improves the `getTempViewOrPermanentTableMetadata` that is introduced in #15054 , to make the code simpler.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes #15160 from cloud-fan/exists.
Diffstat (limited to 'sql/core')
5 files changed, 28 insertions, 62 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9e343b5d24..64d3422cb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -361,12 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException("Cannot create hive serde table with saveAsTable API") } - val sessionState = df.sparkSession.sessionState - val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) - val tableIdentWithDB = tableIdent.copy(database = Some(db)) - // Pass a table identifier with database part, so that `tableExists` won't check temp views - // unexpectedly. - val tableExists = sessionState.catalog.tableExists(tableIdentWithDB) + val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) (tableExists, mode) match { case (true, SaveMode.Ignore) => @@ -392,7 +387,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { bucketSpec = getBucketSpec ) val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) - sessionState.executePlan(cmd).toRdd + df.sparkSession.sessionState.executePlan(cmd).toRdd } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index d8e20b09c1..a04a13e698 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -47,15 +47,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo assert(table.provider.isDefined) val sessionState = sparkSession.sessionState - val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) - val tableIdentWithDB = table.identifier.copy(database = Some(db)) - // Pass a table identifier with database part, so that `tableExists` won't check temp views - // unexpectedly. - if (sessionState.catalog.tableExists(tableIdentWithDB)) { + if (sessionState.catalog.tableExists(table.identifier)) { if (ignoreIfExists) { return Seq.empty[Row] } else { - throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.") + throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") } } @@ -146,8 +142,6 @@ case class CreateDataSourceTableAsSelectCommand( var createMetastoreTable = false var existingSchema = Option.empty[StructType] - // Pass a table identifier with database part, so that `tableExists` won't check temp views - // unexpectedly. if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { // Check if we need to throw an exception or just return. mode match { @@ -172,8 +166,9 @@ case class CreateDataSourceTableAsSelectCommand( // TODO: Check that options from the resolved relation match the relation that we are // inserting into (i.e. using the same compression). - EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(tableIdentWithDB)) match { + // Pass a table identifier with database part, so that `lookupRelation` won't get temp + // views unexpectedly. + EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => // check if the file formats match l.relation match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index b57b2d280d..01ac89868d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -183,32 +183,25 @@ case class DropTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (!catalog.tableExists(tableName)) { - if (!ifExists) { - val objectName = if (isView) "View" else "Table" - throw new AnalysisException(s"$objectName to drop '$tableName' does not exist") - } - } else { - // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view - // issue an exception. - catalog.getTableMetadataOption(tableName).map(_.tableType match { - case CatalogTableType.VIEW if !isView => - throw new AnalysisException( - "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") - case o if o != CatalogTableType.VIEW && isView => - throw new AnalysisException( - s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") - case _ => - }) - try { - sparkSession.sharedState.cacheManager.uncacheQuery( - sparkSession.table(tableName.quotedString)) - } catch { - case NonFatal(e) => log.warn(e.toString, e) - } - catalog.refreshTable(tableName) - catalog.dropTable(tableName, ifExists, purge) + // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view + // issue an exception. + catalog.getTableMetadataOption(tableName).map(_.tableType match { + case CatalogTableType.VIEW if !isView => + throw new AnalysisException( + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") + case o if o != CatalogTableType.VIEW && isView => + throw new AnalysisException( + s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") + case _ => + }) + try { + sparkSession.sharedState.cacheManager.uncacheQuery( + sparkSession.table(tableName.quotedString)) + } catch { + case NonFatal(e) => log.warn(e.toString, e) } + catalog.refreshTable(tableName) + catalog.dropTable(tableName, ifExists, purge) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 94b46c5d97..0f61629317 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -59,16 +59,7 @@ case class CreateTableLikeCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (!catalog.tableExists(sourceTable)) { - throw new AnalysisException( - s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'") - } - - val sourceTableDesc = if (sourceTable.database.isDefined) { - catalog.getTableMetadata(sourceTable) - } else { - catalog.getTempViewOrPermanentTableMetadata(sourceTable.table) - } + val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable) // Storage format val newStorage = @@ -602,11 +593,7 @@ case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableComman override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val table = if (tableName.database.isDefined) { - catalog.getTableMetadata(tableName) - } else { - catalog.getTempViewOrPermanentTableMetadata(tableName.table) - } + val table = catalog.getTempViewOrPermanentTableMetadata(tableName) table.schema.map { c => Row(c.name) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 6fecda232a..f252535765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -151,11 +151,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = { - val tableMetadata = if (tableIdentifier.database.isDefined) { - sessionCatalog.getTableMetadata(tableIdentifier) - } else { - sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table) - } + val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier) val partitionColumnNames = tableMetadata.partitionColumnNames.toSet val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet |