aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-09-05 11:28:19 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-05 11:28:19 +0800
commitc1e9a6d274c281ec30e6d022eedfbe3a2988f721 (patch)
tree557707ecdeeb8ed29e7ab7d3ef7afd104f34a533 /sql/hive/src
parent1b001b5203444cc8d5c4887a30e03e8fb298d17d (diff)
downloadspark-c1e9a6d274c281ec30e6d022eedfbe3a2988f721.tar.gz
spark-c1e9a6d274c281ec30e6d022eedfbe3a2988f721.tar.bz2
spark-c1e9a6d274c281ec30e6d022eedfbe3a2988f721.zip
[SPARK-17393][SQL] Error Handling when CTAS Against the Same Data Source Table Using Overwrite Mode
### What changes were proposed in this pull request? When we trying to read a table and then write to the same table using the `Overwrite` save mode, we got a very confusing error message: For example, ```Scala Seq((1, 2)).toDF("i", "j").write.saveAsTable("tab1") table("tab1").write.mode(SaveMode.Overwrite).saveAsTable("tab1") ``` ``` Job aborted. org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp ... Caused by: org.apache.spark.SparkException: Task failed while writing rows at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources ``` After the PR, we will issue an `AnalysisException`: ``` Cannot overwrite table `tab1` that is also being read from ``` ### How was this patch tested? Added test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #14954 from gatorsmile/ctasQueryAnalyze.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala52
1 files changed, 52 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7a71475a2f..3466733d7f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1151,6 +1151,58 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
+ test("saveAsTable - source and target are the same table") {
+ val tableName = "tab1"
+ withTable(tableName) {
+ Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)
+
+ table(tableName).write.mode(SaveMode.Append).saveAsTable(tableName)
+ checkAnswer(table(tableName),
+ Seq(Row(1, 2), Row(1, 2)))
+
+ table(tableName).write.mode(SaveMode.Ignore).saveAsTable(tableName)
+ checkAnswer(table(tableName),
+ Seq(Row(1, 2), Row(1, 2)))
+
+ var e = intercept[AnalysisException] {
+ table(tableName).write.mode(SaveMode.Overwrite).saveAsTable(tableName)
+ }.getMessage
+ assert(e.contains(s"Cannot overwrite table `$tableName` that is also being read from"))
+
+ e = intercept[AnalysisException] {
+ table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
+ }.getMessage
+ assert(e.contains(s"Table `$tableName` already exists"))
+ }
+ }
+
+ test("insertInto - source and target are the same table") {
+ val tableName = "tab1"
+ withTable(tableName) {
+ Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)
+
+ table(tableName).write.mode(SaveMode.Append).insertInto(tableName)
+ checkAnswer(
+ table(tableName),
+ Seq(Row(1, 2), Row(1, 2)))
+
+ table(tableName).write.mode(SaveMode.Ignore).insertInto(tableName)
+ checkAnswer(
+ table(tableName),
+ Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))
+
+ table(tableName).write.mode(SaveMode.ErrorIfExists).insertInto(tableName)
+ checkAnswer(
+ table(tableName),
+ Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))
+
+ val e = intercept[AnalysisException] {
+ table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName)
+ }.getMessage
+ assert(e.contains(s"Cannot overwrite a path that is also being read from"))
+ }
+ }
+
test("saveAsTable[append]: less columns") {
withTable("saveAsTable_less_columns") {
Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns")