aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-09-18 21:15:35 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-18 21:15:35 +0800
commit3fe630d314cf50d69868b7707ac8d8d2027080b8 (patch)
tree9108a526cf2d18ddec7c5e2278e38c3849a54773
parent3a3c9ffbd282244407e9437c2b02ae7e062dd183 (diff)
downloadspark-3fe630d314cf50d69868b7707ac8d8d2027080b8.tar.gz
spark-3fe630d314cf50d69868b7707ac8d8d2027080b8.tar.bz2
spark-3fe630d314cf50d69868b7707ac8d8d2027080b8.zip
[SPARK-17541][SQL] fix some DDL bugs about table management when same-name temp view exists
## What changes were proposed in this pull request? In `SessionCatalog`, we have several operations(`tableExists`, `dropTable`, `loopupRelation`, etc) that handle both temp views and metastore tables/views. This brings some bugs to DDL commands that want to handle temp view only or metastore table/view only. These bugs are: 1. `CREATE TABLE USING` will fail if a same-name temp view exists 2. `Catalog.dropTempView`will un-cache and drop metastore table if a same-name table exists 3. `saveAsTable` will fail or have unexpected behaviour if a same-name temp view exists. These bug fixes are pulled out from https://github.com/apache/spark/pull/14962 and targets both master and 2.0 branch ## How was this patch tested? new regression tests Author: Wenchen Fan <wenchen@databricks.com> Closes #15099 from cloud-fan/fix-view.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala32
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala76
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala10
10 files changed, 170 insertions, 46 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 9fb5db573b..574c3d7eee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -325,9 +325,9 @@ class SessionCatalog(
new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
}
- // -------------------------------------------------------------
- // | Methods that interact with temporary and metastore tables |
- // -------------------------------------------------------------
+ // ----------------------------------------------
+ // | Methods that interact with temp views only |
+ // ----------------------------------------------
/**
* Create a temporary table.
@@ -344,6 +344,24 @@ class SessionCatalog(
}
/**
+ * Return a temporary view exactly as it was stored.
+ */
+ def getTempView(name: String): Option[LogicalPlan] = synchronized {
+ tempTables.get(formatTableName(name))
+ }
+
+ /**
+ * Drop a temporary view.
+ */
+ def dropTempView(name: String): Unit = synchronized {
+ tempTables.remove(formatTableName(name))
+ }
+
+ // -------------------------------------------------------------
+ // | Methods that interact with temporary and metastore tables |
+ // -------------------------------------------------------------
+
+ /**
* Rename a table.
*
* If a database is specified in `oldName`, this will rename the table in that database.
@@ -492,14 +510,6 @@ class SessionCatalog(
tempTables.clear()
}
- /**
- * Return a temporary table exactly as it was stored.
- * For testing only.
- */
- private[catalog] def getTempTable(name: String): Option[LogicalPlan] = synchronized {
- tempTables.get(formatTableName(name))
- }
-
// ----------------------------------------------------------------------------
// Partitions
// ----------------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 012df629bb..84b77ad250 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -201,16 +201,16 @@ class SessionCatalogSuite extends SparkFunSuite {
val tempTable2 = Range(1, 20, 2, 10)
catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
catalog.createTempView("tbl2", tempTable2, overrideIfExists = false)
- assert(catalog.getTempTable("tbl1") == Option(tempTable1))
- assert(catalog.getTempTable("tbl2") == Option(tempTable2))
- assert(catalog.getTempTable("tbl3").isEmpty)
+ assert(catalog.getTempView("tbl1") == Option(tempTable1))
+ assert(catalog.getTempView("tbl2") == Option(tempTable2))
+ assert(catalog.getTempView("tbl3").isEmpty)
// Temporary table already exists
intercept[TempTableAlreadyExistsException] {
catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
}
// Temporary table already exists but we override it
catalog.createTempView("tbl1", tempTable2, overrideIfExists = true)
- assert(catalog.getTempTable("tbl1") == Option(tempTable2))
+ assert(catalog.getTempView("tbl1") == Option(tempTable2))
}
test("drop table") {
@@ -251,11 +251,11 @@ class SessionCatalogSuite extends SparkFunSuite {
val tempTable = Range(1, 10, 2, 10)
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
- assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
+ assert(sessionCatalog.getTempView("tbl1") == Some(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is not specified, temp table should be dropped first
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
- assert(sessionCatalog.getTempTable("tbl1") == None)
+ assert(sessionCatalog.getTempView("tbl1") == None)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If temp table does not exist, the table in the current database should be dropped
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
@@ -265,7 +265,7 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
purge = false)
- assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
+ assert(sessionCatalog.getTempView("tbl1") == Some(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
}
@@ -303,17 +303,17 @@ class SessionCatalogSuite extends SparkFunSuite {
val tempTable = Range(1, 10, 2, 10)
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
- assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable))
+ assert(sessionCatalog.getTempView("tbl1") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is not specified, temp table should be renamed first
sessionCatalog.renameTable(TableIdentifier("tbl1"), "tbl3")
- assert(sessionCatalog.getTempTable("tbl1").isEmpty)
- assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable))
+ assert(sessionCatalog.getTempView("tbl1").isEmpty)
+ assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is specified, temp tables are never renamed
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), "tbl4")
- assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable))
- assert(sessionCatalog.getTempTable("tbl4").isEmpty)
+ assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
+ assert(sessionCatalog.getTempView("tbl4").isEmpty)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4"))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 64d3422cb4..9e343b5d24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -361,7 +361,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
}
- val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
+ val sessionState = df.sparkSession.sessionState
+ val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val tableIdentWithDB = tableIdent.copy(database = Some(db))
+ // Pass a table identifier with database part, so that `tableExists` won't check temp views
+ // unexpectedly.
+ val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
@@ -387,7 +392,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
bucketSpec = getBucketSpec
)
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
- df.sparkSession.sessionState.executePlan(cmd).toRdd
+ sessionState.executePlan(cmd).toRdd
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index b1830e6cf3..d8e20b09c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -47,11 +47,15 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
assert(table.provider.isDefined)
val sessionState = sparkSession.sessionState
- if (sessionState.catalog.tableExists(table.identifier)) {
+ val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val tableIdentWithDB = table.identifier.copy(database = Some(db))
+ // Pass a table identifier with database part, so that `tableExists` won't check temp views
+ // unexpectedly.
+ if (sessionState.catalog.tableExists(tableIdentWithDB)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
- throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
+ throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.")
}
}
@@ -128,9 +132,11 @@ case class CreateDataSourceTableAsSelectCommand(
assert(table.provider.isDefined)
assert(table.schema.isEmpty)
- val tableName = table.identifier.unquotedString
val provider = table.provider.get
val sessionState = sparkSession.sessionState
+ val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val tableIdentWithDB = table.identifier.copy(database = Some(db))
+ val tableName = tableIdentWithDB.unquotedString
val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
@@ -140,7 +146,9 @@ case class CreateDataSourceTableAsSelectCommand(
var createMetastoreTable = false
var existingSchema = Option.empty[StructType]
- if (sparkSession.sessionState.catalog.tableExists(table.identifier)) {
+ // Pass a table identifier with database part, so that `tableExists` won't check temp views
+ // unexpectedly.
+ if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
@@ -165,7 +173,7 @@ case class CreateDataSourceTableAsSelectCommand(
// inserting into (i.e. using the same compression).
EliminateSubqueryAliases(
- sessionState.catalog.lookupRelation(table.identifier)) match {
+ sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
@@ -188,7 +196,7 @@ case class CreateDataSourceTableAsSelectCommand(
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
case SaveMode.Overwrite =>
- sparkSession.sql(s"DROP TABLE IF EXISTS $tableName")
+ sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
// Need to create the table again.
createMetastoreTable = true
}
@@ -230,7 +238,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
// Refresh the cache of the table in the catalog.
- sessionState.catalog.refreshTable(table.identifier)
+ sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 78ad710a62..3fa6298562 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.types.StructType
@@ -284,8 +284,10 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def dropTempView(viewName: String): Unit = {
- sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName))
- sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true, purge = false)
+ sparkSession.sessionState.catalog.getTempView(viewName).foreach { tempView =>
+ sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, tempView))
+ sessionCatalog.dropTempView(viewName)
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 3cc3b319f5..0ee8c959ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2667,4 +2667,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}.limit(1).queryExecution.toRdd.count()
assert(numRecordsRead.value === 10)
}
+
+ test("CREATE TABLE USING should not fail if a same-name temp view exists") {
+ withTable("same_name") {
+ withTempView("same_name") {
+ spark.range(10).createTempView("same_name")
+ sql("CREATE TABLE same_name(i int) USING json")
+ checkAnswer(spark.table("same_name"), spark.range(10).toDF())
+ assert(spark.table("default.same_name").collect().isEmpty)
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 549fd63f74..3dc67ffafb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -329,6 +329,17 @@ class CatalogSuite
assert(e.message.contains("Cannot create hive serde table with createExternalTable API"))
}
+ test("dropTempView should not un-cache and drop metastore table if a same-name table exists") {
+ withTable("same_name") {
+ spark.range(10).write.saveAsTable("same_name")
+ sql("CACHE TABLE same_name")
+ assert(spark.catalog.isCached("default.same_name"))
+ spark.catalog.dropTempView("same_name")
+ assert(spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
+ assert(spark.catalog.isCached("default.same_name"))
+ }
+ }
+
// TODO: add tests for the rest of them
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 63b0e4588e..7368dad628 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.Utils
@@ -464,4 +465,79 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
checkAnswer(df, spark.createDataset(expectedResult).toDF())
assert(df.schema === expectedSchema)
}
+
+ test("saveAsTable with mode Append should not fail if the table not exists " +
+ "but a same-name temp view exist") {
+ withTable("same_name") {
+ withTempView("same_name") {
+ spark.range(10).createTempView("same_name")
+ spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name")
+ assert(
+ spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
+ }
+ }
+ }
+
+ test("saveAsTable with mode Append should not fail if the table already exists " +
+ "and a same-name temp view exist") {
+ withTable("same_name") {
+ withTempView("same_name") {
+ sql("CREATE TABLE same_name(id LONG) USING parquet")
+ spark.range(10).createTempView("same_name")
+ spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name")
+ checkAnswer(spark.table("same_name"), spark.range(10).toDF())
+ checkAnswer(spark.table("default.same_name"), spark.range(20).toDF())
+ }
+ }
+ }
+
+ test("saveAsTable with mode ErrorIfExists should not fail if the table not exists " +
+ "but a same-name temp view exist") {
+ withTable("same_name") {
+ withTempView("same_name") {
+ spark.range(10).createTempView("same_name")
+ spark.range(20).write.mode(SaveMode.ErrorIfExists).saveAsTable("same_name")
+ assert(
+ spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
+ }
+ }
+ }
+
+ test("saveAsTable with mode Overwrite should not drop the temp view if the table not exists " +
+ "but a same-name temp view exist") {
+ withTable("same_name") {
+ withTempView("same_name") {
+ spark.range(10).createTempView("same_name")
+ spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name")
+ assert(spark.sessionState.catalog.getTempView("same_name").isDefined)
+ assert(
+ spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
+ }
+ }
+ }
+
+ test("saveAsTable with mode Overwrite should not fail if the table already exists " +
+ "and a same-name temp view exist") {
+ withTable("same_name") {
+ withTempView("same_name") {
+ sql("CREATE TABLE same_name(id LONG) USING parquet")
+ spark.range(10).createTempView("same_name")
+ spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name")
+ checkAnswer(spark.table("same_name"), spark.range(10).toDF())
+ checkAnswer(spark.table("default.same_name"), spark.range(20).toDF())
+ }
+ }
+ }
+
+ test("saveAsTable with mode Ignore should create the table if the table not exists " +
+ "but a same-name temp view exist") {
+ withTable("same_name") {
+ withTempView("same_name") {
+ spark.range(10).createTempView("same_name")
+ spark.range(20).write.mode(SaveMode.Ignore).saveAsTable("same_name")
+ assert(
+ spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 0f331bae93..7143adf02b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -339,7 +339,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}.getMessage
assert(
- message.contains("Table ctasJsonTable already exists."),
+ message.contains("Table default.ctasJsonTable already exists."),
"We should complain that ctasJsonTable already exists")
// The following statement should be fine if it has IF NOT EXISTS.
@@ -515,7 +515,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(
intercept[AnalysisException] {
sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString)
- }.getMessage.contains("Table createdJsonTable already exists."),
+ }.getMessage.contains("Table default.createdJsonTable already exists."),
"We should complain that createdJsonTable already exists")
}
@@ -907,7 +907,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val e = intercept[AnalysisException] {
createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet")
}
- assert(e.getMessage.contains("The file format of the existing table appendOrcToParquet " +
+ assert(e.getMessage.contains(
+ "The file format of the existing table default.appendOrcToParquet " +
"is `org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat`. " +
"It doesn't match the specified format `orc`"))
}
@@ -918,7 +919,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDF(10, 19).write.mode(SaveMode.Append).format("parquet")
.saveAsTable("appendParquetToJson")
}
- assert(e.getMessage.contains("The file format of the existing table appendParquetToJson " +
+ assert(e.getMessage.contains(
+ "The file format of the existing table default.appendParquetToJson " +
"is `org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " +
"It doesn't match the specified format `parquet`"))
}
@@ -929,7 +931,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDF(10, 19).write.mode(SaveMode.Append).format("text")
.saveAsTable("appendTextToJson")
}
- assert(e.getMessage.contains("The file format of the existing table appendTextToJson is " +
+ assert(e.getMessage.contains(
+ "The file format of the existing table default.appendTextToJson is " +
"`org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " +
"It doesn't match the specified format `text`"))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 27bb9676e9..22f13a494c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -337,9 +337,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
- Seq.empty[(Int, String)].toDF().createOrReplaceTempView("t")
-
- withTempView("t") {
+ withTable("t") {
+ sql("CREATE TABLE t(i INT) USING parquet")
intercept[AnalysisException] {
testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t")
}
@@ -347,9 +346,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
test("saveAsTable()/load() - non-partitioned table - Ignore") {
- Seq.empty[(Int, String)].toDF().createOrReplaceTempView("t")
-
- withTempView("t") {
+ withTable("t") {
+ sql("CREATE TABLE t(i INT) USING parquet")
testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t")
assert(spark.table("t").collect().isEmpty)
}