diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-09-18 21:15:35 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-09-18 21:15:35 +0800 |
commit | 3fe630d314cf50d69868b7707ac8d8d2027080b8 (patch) | |
tree | 9108a526cf2d18ddec7c5e2278e38c3849a54773 /sql/core/src | |
parent | 3a3c9ffbd282244407e9437c2b02ae7e062dd183 (diff) | |
download | spark-3fe630d314cf50d69868b7707ac8d8d2027080b8.tar.gz spark-3fe630d314cf50d69868b7707ac8d8d2027080b8.tar.bz2 spark-3fe630d314cf50d69868b7707ac8d8d2027080b8.zip |
[SPARK-17541][SQL] fix some DDL bugs about table management when same-name temp view exists
## What changes were proposed in this pull request?
In `SessionCatalog`, we have several operations(`tableExists`, `dropTable`, `loopupRelation`, etc) that handle both temp views and metastore tables/views. This brings some bugs to DDL commands that want to handle temp view only or metastore table/view only. These bugs are:
1. `CREATE TABLE USING` will fail if a same-name temp view exists
2. `Catalog.dropTempView`will un-cache and drop metastore table if a same-name table exists
3. `saveAsTable` will fail or have unexpected behaviour if a same-name temp view exists.
These bug fixes are pulled out from https://github.com/apache/spark/pull/14962 and targets both master and 2.0 branch
## How was this patch tested?
new regression tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes #15099 from cloud-fan/fix-view.
Diffstat (limited to 'sql/core/src')
6 files changed, 125 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 64d3422cb4..9e343b5d24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -361,7 +361,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException("Cannot create hive serde table with saveAsTable API") } - val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) + val sessionState = df.sparkSession.sessionState + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = tableIdent.copy(database = Some(db)) + // Pass a table identifier with database part, so that `tableExists` won't check temp views + // unexpectedly. + val tableExists = sessionState.catalog.tableExists(tableIdentWithDB) (tableExists, mode) match { case (true, SaveMode.Ignore) => @@ -387,7 +392,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { bucketSpec = getBucketSpec ) val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) - df.sparkSession.sessionState.executePlan(cmd).toRdd + sessionState.executePlan(cmd).toRdd } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index b1830e6cf3..d8e20b09c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -47,11 +47,15 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo assert(table.provider.isDefined) val sessionState = sparkSession.sessionState - if (sessionState.catalog.tableExists(table.identifier)) { + val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = table.identifier.copy(database = Some(db)) + // Pass a table identifier with database part, so that `tableExists` won't check temp views + // unexpectedly. + if (sessionState.catalog.tableExists(tableIdentWithDB)) { if (ignoreIfExists) { return Seq.empty[Row] } else { - throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") + throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.") } } @@ -128,9 +132,11 @@ case class CreateDataSourceTableAsSelectCommand( assert(table.provider.isDefined) assert(table.schema.isEmpty) - val tableName = table.identifier.unquotedString val provider = table.provider.get val sessionState = sparkSession.sessionState + val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = table.identifier.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) { table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier)) @@ -140,7 +146,9 @@ case class CreateDataSourceTableAsSelectCommand( var createMetastoreTable = false var existingSchema = Option.empty[StructType] - if (sparkSession.sessionState.catalog.tableExists(table.identifier)) { + // Pass a table identifier with database part, so that `tableExists` won't check temp views + // unexpectedly. + if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => @@ -165,7 +173,7 @@ case class CreateDataSourceTableAsSelectCommand( // inserting into (i.e. using the same compression). EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(table.identifier)) match { + sessionState.catalog.lookupRelation(tableIdentWithDB)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => // check if the file formats match l.relation match { @@ -188,7 +196,7 @@ case class CreateDataSourceTableAsSelectCommand( throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } case SaveMode.Overwrite => - sparkSession.sql(s"DROP TABLE IF EXISTS $tableName") + sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false) // Need to create the table again. createMetastoreTable = true } @@ -230,7 +238,7 @@ case class CreateDataSourceTableAsSelectCommand( } // Refresh the cache of the table in the catalog. - sessionState.catalog.refreshTable(table.identifier) + sessionState.catalog.refreshTable(tableIdentWithDB) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 78ad710a62..3fa6298562 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table} import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, SubqueryAlias} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.types.StructType @@ -284,8 +284,10 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def dropTempView(viewName: String): Unit = { - sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName)) - sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true, purge = false) + sparkSession.sessionState.catalog.getTempView(viewName).foreach { tempView => + sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, tempView)) + sessionCatalog.dropTempView(viewName) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3cc3b319f5..0ee8c959ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2667,4 +2667,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { }.limit(1).queryExecution.toRdd.count() assert(numRecordsRead.value === 10) } + + test("CREATE TABLE USING should not fail if a same-name temp view exists") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + sql("CREATE TABLE same_name(i int) USING json") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + assert(spark.table("default.same_name").collect().isEmpty) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 549fd63f74..3dc67ffafb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -329,6 +329,17 @@ class CatalogSuite assert(e.message.contains("Cannot create hive serde table with createExternalTable API")) } + test("dropTempView should not un-cache and drop metastore table if a same-name table exists") { + withTable("same_name") { + spark.range(10).write.saveAsTable("same_name") + sql("CACHE TABLE same_name") + assert(spark.catalog.isCached("default.same_name")) + spark.catalog.dropTempView("same_name") + assert(spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + assert(spark.catalog.isCached("default.same_name")) + } + } + // TODO: add tests for the rest of them } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 63b0e4588e..7368dad628 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.Utils @@ -464,4 +465,79 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be checkAnswer(df, spark.createDataset(expectedResult).toDF()) assert(df.schema === expectedSchema) } + + test("saveAsTable with mode Append should not fail if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Append should not fail if the table already exists " + + "and a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + sql("CREATE TABLE same_name(id LONG) USING parquet") + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + checkAnswer(spark.table("default.same_name"), spark.range(20).toDF()) + } + } + } + + test("saveAsTable with mode ErrorIfExists should not fail if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.ErrorIfExists).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Overwrite should not drop the temp view if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name") + assert(spark.sessionState.catalog.getTempView("same_name").isDefined) + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Overwrite should not fail if the table already exists " + + "and a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + sql("CREATE TABLE same_name(id LONG) USING parquet") + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + checkAnswer(spark.table("default.same_name"), spark.range(20).toDF()) + } + } + } + + test("saveAsTable with mode Ignore should create the table if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Ignore).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } } |