aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala45
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala52
2 files changed, 71 insertions, 26 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index f14c63c19f..ae77e4cb96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -304,6 +304,25 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " +
s"metastore. Metastore only accepts table name containing characters, numbers and _.")
}
+ if (query.isDefined &&
+ mode == SaveMode.Overwrite &&
+ catalog.tableExists(tableDesc.identifier)) {
+ // Need to remove SubQuery operator.
+ EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
+ // Only do the check if the table is a data source table
+ // (the relation is a BaseRelation).
+ case l @ LogicalRelation(dest: BaseRelation, _, _) =>
+ // Get all input data source relations of the query.
+ val srcRelations = query.get.collect {
+ case LogicalRelation(src: BaseRelation, _, _) => src
+ }
+ if (srcRelations.contains(dest)) {
+ failAnalysis(
+ s"Cannot overwrite table ${tableDesc.identifier} that is also being read from")
+ }
+ case _ => // OK
+ }
+ }
case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: InsertableRelation, _, _),
@@ -357,32 +376,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
- case CreateTable(tableDesc, mode, Some(query)) =>
- // When the SaveMode is Overwrite, we need to check if the table is an input table of
- // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
- if (mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) {
- // Need to remove SubQuery operator.
- EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
- // Only do the check if the table is a data source table
- // (the relation is a BaseRelation).
- case l @ LogicalRelation(dest: BaseRelation, _, _) =>
- // Get all input data source relations of the query.
- val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation, _, _) => src
- }
- if (srcRelations.contains(dest)) {
- failAnalysis(
- s"Cannot overwrite table ${tableDesc.identifier} that is also being read from.")
- } else {
- // OK
- }
-
- case _ => // OK
- }
- } else {
- // OK
- }
-
case _ => // OK
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7a71475a2f..3466733d7f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1151,6 +1151,58 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
+ test("saveAsTable - source and target are the same table") {
+ val tableName = "tab1"
+ withTable(tableName) {
+ Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)
+
+ table(tableName).write.mode(SaveMode.Append).saveAsTable(tableName)
+ checkAnswer(table(tableName),
+ Seq(Row(1, 2), Row(1, 2)))
+
+ table(tableName).write.mode(SaveMode.Ignore).saveAsTable(tableName)
+ checkAnswer(table(tableName),
+ Seq(Row(1, 2), Row(1, 2)))
+
+ var e = intercept[AnalysisException] {
+ table(tableName).write.mode(SaveMode.Overwrite).saveAsTable(tableName)
+ }.getMessage
+ assert(e.contains(s"Cannot overwrite table `$tableName` that is also being read from"))
+
+ e = intercept[AnalysisException] {
+ table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
+ }.getMessage
+ assert(e.contains(s"Table `$tableName` already exists"))
+ }
+ }
+
+ test("insertInto - source and target are the same table") {
+ val tableName = "tab1"
+ withTable(tableName) {
+ Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)
+
+ table(tableName).write.mode(SaveMode.Append).insertInto(tableName)
+ checkAnswer(
+ table(tableName),
+ Seq(Row(1, 2), Row(1, 2)))
+
+ table(tableName).write.mode(SaveMode.Ignore).insertInto(tableName)
+ checkAnswer(
+ table(tableName),
+ Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))
+
+ table(tableName).write.mode(SaveMode.ErrorIfExists).insertInto(tableName)
+ checkAnswer(
+ table(tableName),
+ Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))
+
+ val e = intercept[AnalysisException] {
+ table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName)
+ }.getMessage
+ assert(e.contains(s"Cannot overwrite a path that is also being read from"))
+ }
+ }
+
test("saveAsTable[append]: less columns") {
withTable("saveAsTable_less_columns") {
Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns")