aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-03-01 22:50:25 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-01 22:50:25 -0800
commitde2b53df4c779b265ae038d88f298786a9236234 (patch)
treebd9d5f3d42c1cb25c875b8ebaf26a17a8cc0b2f8 /sql/hive
parent89990a01099b2d632b65112eb755de648aa54c16 (diff)
downloadspark-de2b53df4c779b265ae038d88f298786a9236234.tar.gz
spark-de2b53df4c779b265ae038d88f298786a9236234.tar.bz2
spark-de2b53df4c779b265ae038d88f298786a9236234.zip
[SPARK-19583][SQL] CTAS for data source table with a created location should succeed
## What changes were proposed in this pull request? ``` spark.sql( s""" |CREATE TABLE t |USING parquet |PARTITIONED BY(a, b) |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) ``` Failed with the error message: ``` path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.; org.apache.spark.sql.AnalysisException: path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:102) ``` while hive table is ok ,so we should fix it for datasource table. The reason is that the SaveMode check is put in `InsertIntoHadoopFsRelationCommand` , and the SaveMode check actually use `path`, this is fine when we use `DataFrameWriter.save()`, because this situation of SaveMode act on `path`. While when we use `CreateDataSourceAsSelectCommand`, the situation of SaveMode act on table, and we have already do SaveMode check in `CreateDataSourceAsSelectCommand` for table , so we should not do SaveMode check in the following logic in `InsertIntoHadoopFsRelationCommand` for path, this is redundant and wrong logic for `CreateDataSourceAsSelectCommand` After this PR, the following DDL will succeed, when the location has been created we will append it or overwrite it. ``` CREATE TABLE ... (PARTITIONED BY ...) LOCATION path AS SELECT ... ``` ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16938 from windpiger/CTASDataSourceWitLocation.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala99
1 files changed, 99 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 792ac1e259..81ae5b7bdb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1587,4 +1587,103 @@ class HiveDDLSuite
}
}
}
+
+ Seq(true, false).foreach { shouldDelete =>
+ val tcName = if (shouldDelete) "non-existent" else "existed"
+ test(s"CTAS for external data source table with a $tcName location") {
+ withTable("t", "t1") {
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t
+ |USING parquet
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == dir.getAbsolutePath)
+
+ checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+ }
+ // partition table
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t1
+ |USING parquet
+ |PARTITIONED BY(a, b)
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == dir.getAbsolutePath)
+
+ val partDir = new File(dir, "a=3")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+ }
+ }
+ }
+
+ test(s"CTAS for external hive table with a $tcName location") {
+ withTable("t", "t1") {
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t
+ |USING hive
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val dirPath = new Path(dir.getAbsolutePath)
+ val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(new Path(table.location) == fs.makeQualified(dirPath))
+
+ checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+ }
+ // partition table
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t1
+ |USING hive
+ |PARTITIONED BY(a, b)
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val dirPath = new Path(dir.getAbsolutePath)
+ val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(new Path(table.location) == fs.makeQualified(dirPath))
+
+ val partDir = new File(dir, "a=3")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+ }
+ }
+ }
+ }
+ }
}