aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-03-10 20:59:32 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-10 20:59:32 -0800
commitf6fdf92d0dce2cb3340f3e2ff768e09ef69176cd (patch)
tree3f1be05f6be1d490d7be1e5b52c4548d5bb1207e /sql
parentfb9beda54622e0c3190c6504fc468fa4e50eeb45 (diff)
downloadspark-f6fdf92d0dce2cb3340f3e2ff768e09ef69176cd.tar.gz
spark-f6fdf92d0dce2cb3340f3e2ff768e09ef69176cd.tar.bz2
spark-f6fdf92d0dce2cb3340f3e2ff768e09ef69176cd.zip
[SPARK-19723][SQL] create datasource table with an non-existent location should work
## What changes were proposed in this pull request? This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583) As we discussed in that [PR](https://github.com/apache/spark/pull/16938) The following DDL for datasource table with an non-existent location should work: ``` CREATE TABLE ... (PARTITIONED BY ...) LOCATION path ``` Currently it will throw exception that path not exists for datasource table for datasource table ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17055 from windpiger/CTDataSourcePathNotExists.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala106
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala111
3 files changed, 115 insertions, 105 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 3da66afced..2d890118ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -73,7 +73,8 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
className = table.provider.get,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
- catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
+ // As discussed in SPARK-19583, we don't check if the location is existed
+ catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 5f70a8ce89..0666f446f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -230,7 +230,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
private def getDBPath(dbName: String): URI = {
- val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}")
+ val warehousePath = makeQualifiedPath(spark.sessionState.conf.warehousePath)
new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
}
@@ -1899,7 +1899,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
- test("insert data to a data source table which has a not existed location should succeed") {
+ test("insert data to a data source table which has a non-existing location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@@ -1939,7 +1939,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
- test("insert into a data source table with no existed partition location should succeed") {
+ test("insert into a data source table with a non-existing partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@@ -1966,7 +1966,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
- test("read data from a data source table which has a not existed location should succeed") {
+ test("read data from a data source table which has a non-existing location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@@ -1994,7 +1994,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
- test("read data from a data source table with no existed partition location should succeed") {
+ test("read data from a data source table with non-existing partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@@ -2016,48 +2016,72 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
+ test("create datasource table with a non-existing location") {
+ withTable("t", "t1") {
+ withTempPath { dir =>
+ spark.sql(s"CREATE TABLE t(a int, b int) USING parquet LOCATION '$dir'")
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ spark.sql("INSERT INTO TABLE t SELECT 1, 2")
+ assert(dir.exists())
+
+ checkAnswer(spark.table("t"), Row(1, 2))
+ }
+ // partition table
+ withTempPath { dir =>
+ spark.sql(s"CREATE TABLE t1(a int, b int) USING parquet PARTITIONED BY(a) LOCATION '$dir'")
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")
+
+ val partDir = new File(dir, "a=1")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(2, 1))
+ }
+ }
+ }
+
Seq(true, false).foreach { shouldDelete =>
- val tcName = if (shouldDelete) "non-existent" else "existed"
+ val tcName = if (shouldDelete) "non-existing" else "existed"
test(s"CTAS for external data source table with a $tcName location") {
withTable("t", "t1") {
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
- spark.sql(
- s"""
- |CREATE TABLE t
- |USING parquet
- |LOCATION '$dir'
- |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
- """.stripMargin)
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+ withTempDir { dir =>
+ if (shouldDelete) dir.delete()
+ spark.sql(
+ s"""
+ |CREATE TABLE t
+ |USING parquet
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
- checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+ checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
// partition table
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
- spark.sql(
- s"""
- |CREATE TABLE t1
- |USING parquet
- |PARTITIONED BY(a, b)
- |LOCATION '$dir'
- |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
- """.stripMargin)
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
-
- val partDir = new File(dir, "a=3")
- assert(partDir.exists())
-
- checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+ withTempDir { dir =>
+ if (shouldDelete) dir.delete()
+ spark.sql(
+ s"""
+ |CREATE TABLE t1
+ |USING parquet
+ |PARTITIONED BY(a, b)
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ val partDir = new File(dir, "a=3")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 79ad156c55..d29242bb47 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1663,43 +1663,73 @@ class HiveDDLSuite
}
}
+ test("create hive table with a non-existing location") {
+ withTable("t", "t1") {
+ withTempPath { dir =>
+ spark.sql(s"CREATE TABLE t(a int, b int) USING hive LOCATION '$dir'")
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ spark.sql("INSERT INTO TABLE t SELECT 1, 2")
+ assert(dir.exists())
+
+ checkAnswer(spark.table("t"), Row(1, 2))
+ }
+ // partition table
+ withTempPath { dir =>
+ spark.sql(
+ s"""
+ |CREATE TABLE t1(a int, b int)
+ |USING hive
+ |PARTITIONED BY(a)
+ |LOCATION '$dir'
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")
+
+ val partDir = new File(dir, "a=1")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(2, 1))
+ }
+ }
+ }
+
Seq(true, false).foreach { shouldDelete =>
- val tcName = if (shouldDelete) "non-existent" else "existed"
- test(s"CTAS for external data source table with a $tcName location") {
+ val tcName = if (shouldDelete) "non-existing" else "existed"
+
+ test(s"CTAS for external hive table with a $tcName location") {
withTable("t", "t1") {
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ withTempDir { dir =>
+ if (shouldDelete) dir.delete()
spark.sql(
s"""
|CREATE TABLE t
- |USING parquet
+ |USING hive
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
-
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
- }
- // partition table
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
+ }
+ // partition table
+ withTempDir { dir =>
+ if (shouldDelete) dir.delete()
spark.sql(
s"""
|CREATE TABLE t1
- |USING parquet
+ |USING hive
|PARTITIONED BY(a, b)
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
-
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
@@ -1707,51 +1737,6 @@ class HiveDDLSuite
assert(partDir.exists())
checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
- }
- }
- }
-
- test(s"CTAS for external hive table with a $tcName location") {
- withTable("t", "t1") {
- withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
- spark.sql(
- s"""
- |CREATE TABLE t
- |USING hive
- |LOCATION '$dir'
- |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
- """.stripMargin)
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
-
- checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
- }
- // partition table
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
- spark.sql(
- s"""
- |CREATE TABLE t1
- |USING hive
- |PARTITIONED BY(a, b)
- |LOCATION '$dir'
- |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
- """.stripMargin)
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
-
- val partDir = new File(dir, "a=3")
- assert(partDir.exists())
-
- checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
}
}
}