diff options
4 files changed, 213 insertions, 198 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9c5660a378..405f38ad49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -23,11 +23,12 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType /** @@ -364,7 +365,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException("Cannot create hive serde table with saveAsTable API") } - val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) + val catalog = df.sparkSession.sessionState.catalog + val tableExists = catalog.tableExists(tableIdent) + val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) + val tableIdentWithDB = tableIdent.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString (tableExists, mode) match { case (true, SaveMode.Ignore) => @@ -373,39 +378,48 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { case (true, SaveMode.ErrorIfExists) => throw new AnalysisException(s"Table $tableIdent already exists.") - case _ => - val existingTable = if (tableExists) { - Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) - } else { - None + case (true, SaveMode.Overwrite) => + // Get all input data source relations of the query. + val srcRelations = df.logicalPlan.collect { + case LogicalRelation(src: BaseRelation, _, _) => src } - val storage = if (tableExists) { - existingTable.get.storage - } else { - DataSource.buildStorageFormatFromOptions(extraOptions.toMap) - } - val tableType = if (tableExists) { - existingTable.get.tableType - } else if (storage.locationUri.isDefined) { - CatalogTableType.EXTERNAL - } else { - CatalogTableType.MANAGED + EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match { + // Only do the check if the table is a data source table (the relation is a BaseRelation). + case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) => + throw new AnalysisException( + s"Cannot overwrite table $tableName that is also being read from") + case _ => // OK } - val tableDesc = CatalogTable( - identifier = tableIdent, - tableType = tableType, - storage = storage, - schema = new StructType, - provider = Some(source), - partitionColumnNames = partitioningColumns.getOrElse(Nil), - bucketSpec = getBucketSpec - ) - df.sparkSession.sessionState.executePlan( - CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd + // Drop the existing table + catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false) + createTable(tableIdent) + + case _ => createTable(tableIdent) } } + private def createTable(tableIdent: TableIdentifier): Unit = { + val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) + val tableType = if (storage.locationUri.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + + val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = tableType, + storage = storage, + schema = new StructType, + provider = Some(source), + partitionColumnNames = partitioningColumns.getOrElse(Nil), + bucketSpec = getBucketSpec + ) + df.sparkSession.sessionState.executePlan( + CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd + } + /** * Saves the content of the `DataFrame` to an external database table via JDBC. In the case the * table already exists in the external database, behavior of this function depends on the @@ -441,7 +455,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotPartitioned("jdbc") assertNotBucketed("jdbc") // connectionProperties should override settings in extraOptions. - this.extraOptions = this.extraOptions ++ connectionProperties.asScala + this.extraOptions ++= connectionProperties.asScala // explicit url and dbtable should override all this.extraOptions += ("url" -> url, "dbtable" -> table) format("jdbc").save() @@ -588,7 +602,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private var mode: SaveMode = SaveMode.ErrorIfExists - private var extraOptions = new scala.collection.mutable.HashMap[String, String] + private val extraOptions = new scala.collection.mutable.HashMap[String, String] private var partitioningColumns: Option[Seq[String]] = None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 81c20475a3..c64c7ad943 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.BaseRelation @@ -134,139 +133,31 @@ case class CreateDataSourceTableAsSelectCommand( assert(table.provider.isDefined) assert(table.schema.isEmpty) - val provider = table.provider.get val sessionState = sparkSession.sessionState val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = table.identifier.copy(database = Some(db)) val tableName = tableIdentWithDB.unquotedString - var createMetastoreTable = false - // We may need to reorder the columns of the query to match the existing table. - var reorderedColumns = Option.empty[Seq[NamedExpression]] - if (sessionState.catalog.tableExists(tableIdentWithDB)) { - // Check if we need to throw an exception or just return. - mode match { - case SaveMode.ErrorIfExists => - throw new AnalysisException(s"Table $tableName already exists. " + - s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " + - s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" + - s"the existing data. " + - s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.") - case SaveMode.Ignore => - // Since the table already exists and the save mode is Ignore, we will just return. - return Seq.empty[Row] - case SaveMode.Append => - val existingTable = sessionState.catalog.getTableMetadata(tableIdentWithDB) + val result = if (sessionState.catalog.tableExists(tableIdentWithDB)) { + assert(mode != SaveMode.Overwrite, + s"Expect the table $tableName has been dropped when the save mode is Overwrite") - if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { - throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + - "not supported yet. Please use the insertInto() API as an alternative.") - } - - // Check if the specified data source match the data source of the existing table. - val existingProvider = DataSource.lookupDataSource(existingTable.provider.get) - val specifiedProvider = DataSource.lookupDataSource(table.provider.get) - // TODO: Check that options from the resolved relation match the relation that we are - // inserting into (i.e. using the same compression). - if (existingProvider != specifiedProvider) { - throw new AnalysisException(s"The format of the existing table $tableName is " + - s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " + - s"`${specifiedProvider.getSimpleName}`.") - } - - if (query.schema.length != existingTable.schema.length) { - throw new AnalysisException( - s"The column number of the existing table $tableName" + - s"(${existingTable.schema.catalogString}) doesn't match the data schema" + - s"(${query.schema.catalogString})") - } - - val resolver = sessionState.conf.resolver - val tableCols = existingTable.schema.map(_.name) - - reorderedColumns = Some(existingTable.schema.map { f => - query.resolve(Seq(f.name), resolver).getOrElse { - val inputColumns = query.schema.map(_.name).mkString(", ") - throw new AnalysisException( - s"cannot resolve '${f.name}' given input columns: [$inputColumns]") - } - }) - - // In `AnalyzeCreateTable`, we verified the consistency between the user-specified table - // definition(partition columns, bucketing) and the SELECT query, here we also need to - // verify the the consistency between the user-specified table definition and the existing - // table definition. - - // Check if the specified partition columns match the existing table. - val specifiedPartCols = CatalogUtils.normalizePartCols( - tableName, tableCols, table.partitionColumnNames, resolver) - if (specifiedPartCols != existingTable.partitionColumnNames) { - throw new AnalysisException( - s""" - |Specified partitioning does not match that of the existing table $tableName. - |Specified partition columns: [${specifiedPartCols.mkString(", ")}] - |Existing partition columns: [${existingTable.partitionColumnNames.mkString(", ")}] - """.stripMargin) - } - - // Check if the specified bucketing match the existing table. - val specifiedBucketSpec = table.bucketSpec.map { bucketSpec => - CatalogUtils.normalizeBucketSpec(tableName, tableCols, bucketSpec, resolver) - } - if (specifiedBucketSpec != existingTable.bucketSpec) { - val specifiedBucketString = - specifiedBucketSpec.map(_.toString).getOrElse("not bucketed") - val existingBucketString = - existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed") - throw new AnalysisException( - s""" - |Specified bucketing does not match that of the existing table $tableName. - |Specified bucketing: $specifiedBucketString - |Existing bucketing: $existingBucketString - """.stripMargin) - } - - case SaveMode.Overwrite => - sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false) - // Need to create the table again. - createMetastoreTable = true + if (mode == SaveMode.ErrorIfExists) { + throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.") + } + if (mode == SaveMode.Ignore) { + // Since the table already exists and the save mode is Ignore, we will just return. + return Seq.empty } - } else { - // The table does not exist. We need to create it in metastore. - createMetastoreTable = true - } - - val data = Dataset.ofRows(sparkSession, query) - val df = reorderedColumns match { - // Reorder the columns of the query to match the existing table. - case Some(cols) => data.select(cols.map(Column(_)): _*) - case None => data - } - val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { - Some(sessionState.catalog.defaultTablePath(table.identifier)) + saveDataIntoTable(sparkSession, table, table.storage.locationUri, query, mode) } else { - table.storage.locationUri - } - - // Create the relation based on the data of df. - val pathOption = tableLocation.map("path" -> _) - val dataSource = DataSource( - sparkSession, - className = provider, - partitionColumns = table.partitionColumnNames, - bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption, - catalogTable = Some(table)) - - val result = try { - dataSource.write(mode, df) - } catch { - case ex: AnalysisException => - logError(s"Failed to write to table $tableName in $mode mode", ex) - throw ex - } - if (createMetastoreTable) { + val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { + Some(sessionState.catalog.defaultTablePath(table.identifier)) + } else { + table.storage.locationUri + } + val result = saveDataIntoTable(sparkSession, table, tableLocation, query, mode) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of @@ -274,6 +165,7 @@ case class CreateDataSourceTableAsSelectCommand( // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). schema = result.schema) sessionState.catalog.createTable(newTable, ignoreIfExists = false) + result } result match { @@ -289,4 +181,29 @@ case class CreateDataSourceTableAsSelectCommand( sessionState.catalog.refreshTable(tableIdentWithDB) Seq.empty[Row] } + + private def saveDataIntoTable( + session: SparkSession, + table: CatalogTable, + tableLocation: Option[String], + data: LogicalPlan, + mode: SaveMode): BaseRelation = { + // Create the relation based on the input logical plan: `data`. + val pathOption = tableLocation.map("path" -> _) + val dataSource = DataSource( + session, + className = table.provider.get, + partitionColumns = table.partitionColumnNames, + bucketSpec = table.bucketSpec, + options = table.storage.properties ++ pathOption, + catalogTable = Some(table)) + + try { + dataSource.write(mode, Dataset.ofRows(session, query)) + } catch { + case ex: AnalysisException => + logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) + throw ex + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 2b2fbddd12..07b16671f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogUtils, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ @@ -86,6 +86,108 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl } c + // When we append data to an existing table, check if the given provider, partition columns, + // bucket spec, etc. match the existing table, and adjust the columns order of the given query + // if necessary. + case c @ CreateTable(tableDesc, SaveMode.Append, Some(query)) + if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) => + // This is guaranteed by the parser and `DataFrameWriter` + assert(tableDesc.schema.isEmpty && tableDesc.provider.isDefined) + + // Analyze the query in CTAS and then we can do the normalization and checking. + val qe = sparkSession.sessionState.executePlan(query) + qe.assertAnalyzed() + val analyzedQuery = qe.analyzed + + val catalog = sparkSession.sessionState.catalog + val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase) + val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString + val existingTable = catalog.getTableMetadata(tableIdentWithDB) + + if (existingTable.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("Saving data into a view is not allowed.") + } + + if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + + "not supported yet. Please use the insertInto() API as an alternative.") + } + + // Check if the specified data source match the data source of the existing table. + val existingProvider = DataSource.lookupDataSource(existingTable.provider.get) + val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get) + // TODO: Check that options from the resolved relation match the relation that we are + // inserting into (i.e. using the same compression). + if (existingProvider != specifiedProvider) { + throw new AnalysisException(s"The format of the existing table $tableName is " + + s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " + + s"`${specifiedProvider.getSimpleName}`.") + } + + if (analyzedQuery.schema.length != existingTable.schema.length) { + throw new AnalysisException( + s"The column number of the existing table $tableName" + + s"(${existingTable.schema.catalogString}) doesn't match the data schema" + + s"(${query.schema.catalogString})") + } + + val resolver = sparkSession.sessionState.conf.resolver + val tableCols = existingTable.schema.map(_.name) + + // As we are inserting into an existing table, we should respect the existing schema and + // adjust the column order of the given dataframe according to it, or throw exception + // if the column names do not match. + val adjustedColumns = tableCols.map { col => + analyzedQuery.resolve(Seq(col), resolver).getOrElse { + val inputColumns = analyzedQuery.schema.map(_.name).mkString(", ") + throw new AnalysisException( + s"cannot resolve '$col' given input columns: [$inputColumns]") + } + } + + // Check if the specified partition columns match the existing table. + val specifiedPartCols = CatalogUtils.normalizePartCols( + tableName, tableCols, tableDesc.partitionColumnNames, resolver) + if (specifiedPartCols != existingTable.partitionColumnNames) { + val existingPartCols = existingTable.partitionColumnNames.mkString(", ") + throw new AnalysisException( + s""" + |Specified partitioning does not match that of the existing table $tableName. + |Specified partition columns: [${specifiedPartCols.mkString(", ")}] + |Existing partition columns: [$existingPartCols] + """.stripMargin) + } + + // Check if the specified bucketing match the existing table. + val specifiedBucketSpec = tableDesc.bucketSpec.map { bucketSpec => + CatalogUtils.normalizeBucketSpec(tableName, tableCols, bucketSpec, resolver) + } + if (specifiedBucketSpec != existingTable.bucketSpec) { + val specifiedBucketString = + specifiedBucketSpec.map(_.toString).getOrElse("not bucketed") + val existingBucketString = + existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed") + throw new AnalysisException( + s""" + |Specified bucketing does not match that of the existing table $tableName. + |Specified bucketing: $specifiedBucketString + |Existing bucketing: $existingBucketString + """.stripMargin) + } + + val newQuery = if (adjustedColumns != analyzedQuery.output) { + Project(adjustedColumns, analyzedQuery) + } else { + analyzedQuery + } + + c.copy( + // trust everything from the existing table, except schema as we assume it's empty in a lot + // of places, when we do CTAS. + tableDesc = existingTable.copy(schema = new StructType()), + query = Some(newQuery)) + // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity // config, and do various checks: // * column names in table definition can't be duplicated. @@ -94,7 +196,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl // * can't use all table columns as partition columns. // * partition columns' type must be AtomicType. // * sort columns' type must be orderable. - case c @ CreateTable(tableDesc, mode, query) => + case c @ CreateTable(tableDesc, _, query) => val analyzedQuery = query.map { q => // Analyze the query in CTAS and then we can do the normalization and checking. val qe = sparkSession.sessionState.executePlan(q) @@ -106,6 +208,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl } else { tableDesc.schema } + val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) { schema.map(_.name) } else { @@ -113,22 +216,24 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl } checkDuplication(columnNames, "table definition of " + tableDesc.identifier) - val partitionColsChecked = checkPartitionColumns(schema, tableDesc) - val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked) - c.copy(tableDesc = bucketColsChecked, query = analyzedQuery) + val normalizedTable = tableDesc.copy( + partitionColumnNames = normalizePartitionColumns(schema, tableDesc), + bucketSpec = normalizeBucketSpec(schema, tableDesc)) + + c.copy(tableDesc = normalizedTable, query = analyzedQuery) } - private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { + private def normalizePartitionColumns(schema: StructType, table: CatalogTable): Seq[String] = { val normalizedPartitionCols = CatalogUtils.normalizePartCols( - tableName = tableDesc.identifier.unquotedString, + tableName = table.identifier.unquotedString, tableCols = schema.map(_.name), - partCols = tableDesc.partitionColumnNames, + partCols = table.partitionColumnNames, resolver = sparkSession.sessionState.conf.resolver) checkDuplication(normalizedPartitionCols, "partition") if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { - if (tableDesc.provider.get == DDLUtils.HIVE_PROVIDER) { + if (table.provider.get == DDLUtils.HIVE_PROVIDER) { // When we hit this branch, it means users didn't specify schema for the table to be // created, as we always include partition columns in table schema for hive serde tables. // The real schema will be inferred at hive metastore by hive serde, plus the given @@ -144,28 +249,28 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column") } - tableDesc.copy(partitionColumnNames = normalizedPartitionCols) + normalizedPartitionCols } - private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { - tableDesc.bucketSpec match { + private def normalizeBucketSpec(schema: StructType, table: CatalogTable): Option[BucketSpec] = { + table.bucketSpec match { case Some(bucketSpec) => - val normalizedBucketing = CatalogUtils.normalizeBucketSpec( - tableName = tableDesc.identifier.unquotedString, + val normalizedBucketSpec = CatalogUtils.normalizeBucketSpec( + tableName = table.identifier.unquotedString, tableCols = schema.map(_.name), bucketSpec = bucketSpec, resolver = sparkSession.sessionState.conf.resolver) - checkDuplication(normalizedBucketing.bucketColumnNames, "bucket") - checkDuplication(normalizedBucketing.sortColumnNames, "sort") + checkDuplication(normalizedBucketSpec.bucketColumnNames, "bucket") + checkDuplication(normalizedBucketSpec.sortColumnNames, "sort") - normalizedBucketing.sortColumnNames.map(schema(_)).map(_.dataType).foreach { + normalizedBucketSpec.sortColumnNames.map(schema(_)).map(_.dataType).foreach { case dt if RowOrdering.isOrderable(dt) => // OK case other => failAnalysis(s"Cannot use ${other.simpleString} for sorting column") } - tableDesc.copy(bucketSpec = Some(normalizedBucketing)) + Some(normalizedBucketSpec) - case None => tableDesc + case None => None } } @@ -294,27 +399,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) def apply(plan: LogicalPlan): Unit = { plan.foreach { - case c @ CreateTable(tableDesc, mode, query) if c.resolved => - if (query.isDefined && - mode == SaveMode.Overwrite && - catalog.tableExists(tableDesc.identifier)) { - // Need to remove SubQuery operator. - EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match { - // Only do the check if the table is a data source table - // (the relation is a BaseRelation). - case LogicalRelation(dest: BaseRelation, _, _) => - // Get all input data source relations of the query. - val srcRelations = query.get.collect { - case LogicalRelation(src: BaseRelation, _, _) => src - } - if (srcRelations.contains(dest)) { - failAnalysis( - s"Cannot overwrite table ${tableDesc.identifier} that is also being read from") - } - case _ => // OK - } - } - case logical.InsertIntoTable( l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) => // Right now, we do not support insert into a data source table with partition specs. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index deb40f0464..0f787be0bb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1239,7 +1239,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv var e = intercept[AnalysisException] { table(tableName).write.mode(SaveMode.Overwrite).saveAsTable(tableName) }.getMessage - assert(e.contains(s"Cannot overwrite table `$tableName` that is also being read from")) + assert(e.contains(s"Cannot overwrite table default.$tableName that is also being read from")) e = intercept[AnalysisException] { table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName) |