aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-09-22 12:52:09 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-22 12:52:09 +0800
commitb50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0 (patch)
tree1097e32b1e9aac12f7166dae9d5cb88f363abad2 /sql/core
parent8bde03bf9a0896ea59ceaa699df7700351a130fb (diff)
downloadspark-b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0.tar.gz
spark-b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0.tar.bz2
spark-b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0.zip
[SPARK-17609][SQL] SessionCatalog.tableExists should not check temp view
## What changes were proposed in this pull request? After #15054 , there is no place in Spark SQL that need `SessionCatalog.tableExists` to check temp views, so this PR makes `SessionCatalog.tableExists` only check permanent table/view and removes some hacks. This PR also improves the `getTempViewOrPermanentTableMetadata` that is introduced in #15054 , to make the code simpler. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #15160 from cloud-fan/exists.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala43
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala6
5 files changed, 28 insertions, 62 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9e343b5d24..64d3422cb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -361,12 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
}
- val sessionState = df.sparkSession.sessionState
- val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val tableIdentWithDB = tableIdent.copy(database = Some(db))
- // Pass a table identifier with database part, so that `tableExists` won't check temp views
- // unexpectedly.
- val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)
+ val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
@@ -392,7 +387,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
bucketSpec = getBucketSpec
)
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
- sessionState.executePlan(cmd).toRdd
+ df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d8e20b09c1..a04a13e698 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -47,15 +47,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
assert(table.provider.isDefined)
val sessionState = sparkSession.sessionState
- val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val tableIdentWithDB = table.identifier.copy(database = Some(db))
- // Pass a table identifier with database part, so that `tableExists` won't check temp views
- // unexpectedly.
- if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+ if (sessionState.catalog.tableExists(table.identifier)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
- throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.")
+ throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
}
}
@@ -146,8 +142,6 @@ case class CreateDataSourceTableAsSelectCommand(
var createMetastoreTable = false
var existingSchema = Option.empty[StructType]
- // Pass a table identifier with database part, so that `tableExists` won't check temp views
- // unexpectedly.
if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
// Check if we need to throw an exception or just return.
mode match {
@@ -172,8 +166,9 @@ case class CreateDataSourceTableAsSelectCommand(
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
- EliminateSubqueryAliases(
- sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
+ // Pass a table identifier with database part, so that `lookupRelation` won't get temp
+ // views unexpectedly.
+ EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index b57b2d280d..01ac89868d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -183,32 +183,25 @@ case class DropTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- if (!catalog.tableExists(tableName)) {
- if (!ifExists) {
- val objectName = if (isView) "View" else "Table"
- throw new AnalysisException(s"$objectName to drop '$tableName' does not exist")
- }
- } else {
- // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
- // issue an exception.
- catalog.getTableMetadataOption(tableName).map(_.tableType match {
- case CatalogTableType.VIEW if !isView =>
- throw new AnalysisException(
- "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
- case o if o != CatalogTableType.VIEW && isView =>
- throw new AnalysisException(
- s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
- case _ =>
- })
- try {
- sparkSession.sharedState.cacheManager.uncacheQuery(
- sparkSession.table(tableName.quotedString))
- } catch {
- case NonFatal(e) => log.warn(e.toString, e)
- }
- catalog.refreshTable(tableName)
- catalog.dropTable(tableName, ifExists, purge)
+ // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
+ // issue an exception.
+ catalog.getTableMetadataOption(tableName).map(_.tableType match {
+ case CatalogTableType.VIEW if !isView =>
+ throw new AnalysisException(
+ "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+ case o if o != CatalogTableType.VIEW && isView =>
+ throw new AnalysisException(
+ s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+ case _ =>
+ })
+ try {
+ sparkSession.sharedState.cacheManager.uncacheQuery(
+ sparkSession.table(tableName.quotedString))
+ } catch {
+ case NonFatal(e) => log.warn(e.toString, e)
}
+ catalog.refreshTable(tableName)
+ catalog.dropTable(tableName, ifExists, purge)
Seq.empty[Row]
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 94b46c5d97..0f61629317 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -59,16 +59,7 @@ case class CreateTableLikeCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- if (!catalog.tableExists(sourceTable)) {
- throw new AnalysisException(
- s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
- }
-
- val sourceTableDesc = if (sourceTable.database.isDefined) {
- catalog.getTableMetadata(sourceTable)
- } else {
- catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
- }
+ val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
// Storage format
val newStorage =
@@ -602,11 +593,7 @@ case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableComman
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- val table = if (tableName.database.isDefined) {
- catalog.getTableMetadata(tableName)
- } else {
- catalog.getTempViewOrPermanentTableMetadata(tableName.table)
- }
+ val table = catalog.getTempViewOrPermanentTableMetadata(tableName)
table.schema.map { c =>
Row(c.name)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 6fecda232a..f252535765 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -151,11 +151,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
- val tableMetadata = if (tableIdentifier.database.isDefined) {
- sessionCatalog.getTableMetadata(tableIdentifier)
- } else {
- sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table)
- }
+ val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier)
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet