From c1e9a6d274c281ec30e6d022eedfbe3a2988f721 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 5 Sep 2016 11:28:19 +0800 Subject: [SPARK-17393][SQL] Error Handling when CTAS Against the Same Data Source Table Using Overwrite Mode ### What changes were proposed in this pull request? When we trying to read a table and then write to the same table using the `Overwrite` save mode, we got a very confusing error message: For example, ```Scala Seq((1, 2)).toDF("i", "j").write.saveAsTable("tab1") table("tab1").write.mode(SaveMode.Overwrite).saveAsTable("tab1") ``` ``` Job aborted. org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp ... Caused by: org.apache.spark.SparkException: Task failed while writing rows at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources ``` After the PR, we will issue an `AnalysisException`: ``` Cannot overwrite table `tab1` that is also being read from ``` ### How was this patch tested? Added test cases. Author: gatorsmile Closes #14954 from gatorsmile/ctasQueryAnalyze. --- .../spark/sql/execution/datasources/rules.scala | 45 ++++++++----------- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 52 ++++++++++++++++++++++ 2 files changed, 71 insertions(+), 26 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index f14c63c19f..ae77e4cb96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -304,6 +304,25 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " + s"metastore. Metastore only accepts table name containing characters, numbers and _.") } + if (query.isDefined && + mode == SaveMode.Overwrite && + catalog.tableExists(tableDesc.identifier)) { + // Need to remove SubQuery operator. + EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match { + // Only do the check if the table is a data source table + // (the relation is a BaseRelation). + case l @ LogicalRelation(dest: BaseRelation, _, _) => + // Get all input data source relations of the query. + val srcRelations = query.get.collect { + case LogicalRelation(src: BaseRelation, _, _) => src + } + if (srcRelations.contains(dest)) { + failAnalysis( + s"Cannot overwrite table ${tableDesc.identifier} that is also being read from") + } + case _ => // OK + } + } case i @ logical.InsertIntoTable( l @ LogicalRelation(t: InsertableRelation, _, _), @@ -357,32 +376,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") - case CreateTable(tableDesc, mode, Some(query)) => - // When the SaveMode is Overwrite, we need to check if the table is an input table of - // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) { - // Need to remove SubQuery operator. - EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match { - // Only do the check if the table is a data source table - // (the relation is a BaseRelation). - case l @ LogicalRelation(dest: BaseRelation, _, _) => - // Get all input data source relations of the query. - val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation, _, _) => src - } - if (srcRelations.contains(dest)) { - failAnalysis( - s"Cannot overwrite table ${tableDesc.identifier} that is also being read from.") - } else { - // OK - } - - case _ => // OK - } - } else { - // OK - } - case _ => // OK } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 7a71475a2f..3466733d7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1151,6 +1151,58 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } + test("saveAsTable - source and target are the same table") { + val tableName = "tab1" + withTable(tableName) { + Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName) + + table(tableName).write.mode(SaveMode.Append).saveAsTable(tableName) + checkAnswer(table(tableName), + Seq(Row(1, 2), Row(1, 2))) + + table(tableName).write.mode(SaveMode.Ignore).saveAsTable(tableName) + checkAnswer(table(tableName), + Seq(Row(1, 2), Row(1, 2))) + + var e = intercept[AnalysisException] { + table(tableName).write.mode(SaveMode.Overwrite).saveAsTable(tableName) + }.getMessage + assert(e.contains(s"Cannot overwrite table `$tableName` that is also being read from")) + + e = intercept[AnalysisException] { + table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName) + }.getMessage + assert(e.contains(s"Table `$tableName` already exists")) + } + } + + test("insertInto - source and target are the same table") { + val tableName = "tab1" + withTable(tableName) { + Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName) + + table(tableName).write.mode(SaveMode.Append).insertInto(tableName) + checkAnswer( + table(tableName), + Seq(Row(1, 2), Row(1, 2))) + + table(tableName).write.mode(SaveMode.Ignore).insertInto(tableName) + checkAnswer( + table(tableName), + Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2))) + + table(tableName).write.mode(SaveMode.ErrorIfExists).insertInto(tableName) + checkAnswer( + table(tableName), + Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2))) + + val e = intercept[AnalysisException] { + table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName) + }.getMessage + assert(e.contains(s"Cannot overwrite a path that is also being read from")) + } + } + test("saveAsTable[append]: less columns") { withTable("saveAsTable_less_columns") { Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns") -- cgit v1.2.3