aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-09-22 12:52:09 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-22 12:52:09 +0800
commitb50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0 (patch)
tree1097e32b1e9aac12f7166dae9d5cb88f363abad2
parent8bde03bf9a0896ea59ceaa699df7700351a130fb (diff)
downloadspark-b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0.tar.gz
spark-b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0.tar.bz2
spark-b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0.zip
[SPARK-17609][SQL] SessionCatalog.tableExists should not check temp view
## What changes were proposed in this pull request? After #15054 , there is no place in Spark SQL that need `SessionCatalog.tableExists` to check temp views, so this PR makes `SessionCatalog.tableExists` only check permanent table/view and removes some hacks. This PR also improves the `getTempViewOrPermanentTableMetadata` that is introduced in #15054 , to make the code simpler. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #15160 from cloud-fan/exists.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala70
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala43
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala4
9 files changed, 81 insertions, 115 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index ef29c75c01..8c01c7a3f2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -246,6 +246,16 @@ class SessionCatalog(
}
/**
+ * Return whether a table/view with the specified name exists. If no database is specified, check
+ * with current database.
+ */
+ def tableExists(name: TableIdentifier): Boolean = synchronized {
+ val db = formatDatabaseName(name.database.getOrElse(currentDb))
+ val table = formatTableName(name.table)
+ externalCatalog.tableExists(db, table)
+ }
+
+ /**
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database. If the specified table/view is not found
* in the database then a [[NoSuchTableException]] is thrown.
@@ -271,24 +281,6 @@ class SessionCatalog(
}
/**
- * Retrieve the metadata of an existing temporary view or permanent table/view.
- * If the temporary view does not exist, tries to get the metadata an existing permanent
- * table/view. If no database is specified, assume the table/view is in the current database.
- * If the specified table/view is not found in the database then a [[NoSuchTableException]] is
- * thrown.
- */
- def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = synchronized {
- val table = formatTableName(name)
- getTempView(table).map { plan =>
- CatalogTable(
- identifier = TableIdentifier(table),
- tableType = CatalogTableType.VIEW,
- storage = CatalogStorageFormat.empty,
- schema = plan.output.toStructType)
- }.getOrElse(getTableMetadata(TableIdentifier(name)))
- }
-
- /**
* Load files stored in given path into an existing metastore table.
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
@@ -369,6 +361,30 @@ class SessionCatalog(
// -------------------------------------------------------------
/**
+ * Retrieve the metadata of an existing temporary view or permanent table/view.
+ *
+ * If a database is specified in `name`, this will return the metadata of table/view in that
+ * database.
+ * If no database is specified, this will first attempt to get the metadata of a temporary view
+ * with the same name, then, if that does not exist, return the metadata of table/view in the
+ * current database.
+ */
+ def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
+ val table = formatTableName(name.table)
+ if (name.database.isDefined) {
+ getTableMetadata(name)
+ } else {
+ getTempView(table).map { plan =>
+ CatalogTable(
+ identifier = TableIdentifier(table),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = plan.output.toStructType)
+ }.getOrElse(getTableMetadata(name))
+ }
+ }
+
+ /**
* Rename a table.
*
* If a database is specified in `oldName`, this will rename the table in that database.
@@ -450,24 +466,6 @@ class SessionCatalog(
}
/**
- * Return whether a table/view with the specified name exists.
- *
- * Note: If a database is explicitly specified, then this will return whether the table/view
- * exists in that particular database instead. In that case, even if there is a temporary
- * table with the same name, we will return false if the specified database does not
- * contain the table/view.
- */
- def tableExists(name: TableIdentifier): Boolean = synchronized {
- val db = formatDatabaseName(name.database.getOrElse(currentDb))
- val table = formatTableName(name.table)
- if (isTemporaryTable(name)) {
- true
- } else {
- externalCatalog.tableExists(db, table)
- }
- }
-
- /**
* Return whether a table with the specified name is a temporary table.
*
* Note: The temporary table cache is checked only when database is not
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 384a730861..915ed8f8b1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -425,35 +425,37 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
// If database is explicitly specified, do not check temporary tables
val tempTable = Range(1, 10, 1, 10)
- catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
// If database is not explicitly specified, check the current database
catalog.setCurrentDatabase("db2")
assert(catalog.tableExists(TableIdentifier("tbl1")))
assert(catalog.tableExists(TableIdentifier("tbl2")))
- assert(catalog.tableExists(TableIdentifier("tbl3")))
- }
- test("tableExists on temporary views") {
- val catalog = new SessionCatalog(newBasicCatalog())
- val tempTable = Range(1, 10, 2, 10)
- assert(!catalog.tableExists(TableIdentifier("view1")))
- assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
- catalog.createTempView("view1", tempTable, overrideIfExists = false)
- assert(catalog.tableExists(TableIdentifier("view1")))
- assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
+ catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
+ // tableExists should not check temp view.
+ assert(!catalog.tableExists(TableIdentifier("tbl3")))
}
test("getTempViewOrPermanentTableMetadata on temporary views") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
intercept[NoSuchTableException] {
- catalog.getTempViewOrPermanentTableMetadata("view1")
+ catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1"))
+ }.getMessage
+
+ intercept[NoSuchTableException] {
+ catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
}.getMessage
catalog.createTempView("view1", tempTable, overrideIfExists = false)
- assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier ==
- TableIdentifier("view1"), "the temporary view `view1` should exist")
+ assert(catalog.getTempViewOrPermanentTableMetadata(
+ TableIdentifier("view1")).identifier.table == "view1")
+ assert(catalog.getTempViewOrPermanentTableMetadata(
+ TableIdentifier("view1")).schema(0).name == "id")
+
+ intercept[NoSuchTableException] {
+ catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
+ }.getMessage
}
test("list tables without pattern") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9e343b5d24..64d3422cb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -361,12 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
}
- val sessionState = df.sparkSession.sessionState
- val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val tableIdentWithDB = tableIdent.copy(database = Some(db))
- // Pass a table identifier with database part, so that `tableExists` won't check temp views
- // unexpectedly.
- val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)
+ val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
@@ -392,7 +387,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
bucketSpec = getBucketSpec
)
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
- sessionState.executePlan(cmd).toRdd
+ df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d8e20b09c1..a04a13e698 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -47,15 +47,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
assert(table.provider.isDefined)
val sessionState = sparkSession.sessionState
- val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val tableIdentWithDB = table.identifier.copy(database = Some(db))
- // Pass a table identifier with database part, so that `tableExists` won't check temp views
- // unexpectedly.
- if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+ if (sessionState.catalog.tableExists(table.identifier)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
- throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.")
+ throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
}
}
@@ -146,8 +142,6 @@ case class CreateDataSourceTableAsSelectCommand(
var createMetastoreTable = false
var existingSchema = Option.empty[StructType]
- // Pass a table identifier with database part, so that `tableExists` won't check temp views
- // unexpectedly.
if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
// Check if we need to throw an exception or just return.
mode match {
@@ -172,8 +166,9 @@ case class CreateDataSourceTableAsSelectCommand(
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
- EliminateSubqueryAliases(
- sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
+ // Pass a table identifier with database part, so that `lookupRelation` won't get temp
+ // views unexpectedly.
+ EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index b57b2d280d..01ac89868d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -183,32 +183,25 @@ case class DropTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- if (!catalog.tableExists(tableName)) {
- if (!ifExists) {
- val objectName = if (isView) "View" else "Table"
- throw new AnalysisException(s"$objectName to drop '$tableName' does not exist")
- }
- } else {
- // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
- // issue an exception.
- catalog.getTableMetadataOption(tableName).map(_.tableType match {
- case CatalogTableType.VIEW if !isView =>
- throw new AnalysisException(
- "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
- case o if o != CatalogTableType.VIEW && isView =>
- throw new AnalysisException(
- s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
- case _ =>
- })
- try {
- sparkSession.sharedState.cacheManager.uncacheQuery(
- sparkSession.table(tableName.quotedString))
- } catch {
- case NonFatal(e) => log.warn(e.toString, e)
- }
- catalog.refreshTable(tableName)
- catalog.dropTable(tableName, ifExists, purge)
+ // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
+ // issue an exception.
+ catalog.getTableMetadataOption(tableName).map(_.tableType match {
+ case CatalogTableType.VIEW if !isView =>
+ throw new AnalysisException(
+ "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+ case o if o != CatalogTableType.VIEW && isView =>
+ throw new AnalysisException(
+ s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+ case _ =>
+ })
+ try {
+ sparkSession.sharedState.cacheManager.uncacheQuery(
+ sparkSession.table(tableName.quotedString))
+ } catch {
+ case NonFatal(e) => log.warn(e.toString, e)
}
+ catalog.refreshTable(tableName)
+ catalog.dropTable(tableName, ifExists, purge)
Seq.empty[Row]
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 94b46c5d97..0f61629317 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -59,16 +59,7 @@ case class CreateTableLikeCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- if (!catalog.tableExists(sourceTable)) {
- throw new AnalysisException(
- s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
- }
-
- val sourceTableDesc = if (sourceTable.database.isDefined) {
- catalog.getTableMetadata(sourceTable)
- } else {
- catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
- }
+ val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
// Storage format
val newStorage =
@@ -602,11 +593,7 @@ case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableComman
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- val table = if (tableName.database.isDefined) {
- catalog.getTableMetadata(tableName)
- } else {
- catalog.getTempViewOrPermanentTableMetadata(tableName.table)
- }
+ val table = catalog.getTempViewOrPermanentTableMetadata(tableName)
table.schema.map { c =>
Row(c.name)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 6fecda232a..f252535765 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -151,11 +151,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
- val tableMetadata = if (tableIdentifier.database.isDefined) {
- sessionCatalog.getTableMetadata(tableIdentifier)
- } else {
- sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table)
- }
+ val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier)
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7143adf02b..8ae6868c98 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -515,7 +515,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(
intercept[AnalysisException] {
sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString)
- }.getMessage.contains("Table default.createdJsonTable already exists."),
+ }.getMessage.contains("Table createdJsonTable already exists."),
"We should complain that createdJsonTable already exists")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 38482f66a3..c927e5d802 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -678,8 +678,8 @@ class HiveDDLSuite
.createTempView(sourceViewName)
sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
- val sourceTable =
- spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(sourceViewName)
+ val sourceTable = spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(
+ TableIdentifier(sourceViewName))
val targetTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(targetTabName, Some("default")))