aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala93
3 files changed, 106 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 6e04f6eb80..c356f0c3f1 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -47,7 +47,9 @@ statement
| createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTableUsing
| createTableHeader tableProvider
- (OPTIONS tablePropertyList)? AS? query #createTableUsing
+ (OPTIONS tablePropertyList)?
+ (PARTITIONED BY partitionColumnNames=identifierList)?
+ bucketSpec? AS? query #createTableUsing
| createTableHeader ('(' columns=colTypeList ')')?
(COMMENT STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 79fdf9fb22..e4c837a7ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -289,6 +289,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
val options = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
+ val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
if (ctx.query != null) {
// Get the backing query.
@@ -302,9 +303,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
} else {
SaveMode.ErrorIfExists
}
- CreateTableUsingAsSelect(table, provider, temp, Array.empty, None, mode, options, query)
+
+ val partitionColumnNames =
+ Option(ctx.partitionColumnNames)
+ .map(visitIdentifierList(_).toArray)
+ .getOrElse(Array.empty[String])
+
+ CreateTableUsingAsSelect(
+ table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
} else {
- val struct = Option(ctx.colTypeList).map(createStructType)
+ val struct = Option(ctx.colTypeList()).map(createStructType)
CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = false)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b21ca4f26e..cb100021be 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -940,4 +940,97 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.schema.forall { c => DataTypeParser.parse(c.dataType) == ArrayType(StringType) })
}
}
+
+ test("CTAS: persisted partitioned data source table") {
+ withTempDir { dir =>
+ withTable("t") {
+ val path = dir.getCanonicalPath
+
+ sql(
+ s"""CREATE TABLE t USING PARQUET
+ |OPTIONS (PATH '$path')
+ |PARTITIONED BY (a)
+ |AS SELECT 1 AS a, 2 AS b
+ """.stripMargin
+ )
+
+ val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
+ assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
+ assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBuckets"))
+ assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBucketCols"))
+ assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+
+ checkAnswer(table("t"), Row(2, 1))
+ }
+ }
+ }
+
+ test("CTAS: persisted bucketed data source table") {
+ withTempDir { dir =>
+ withTable("t") {
+ val path = dir.getCanonicalPath
+
+ sql(
+ s"""CREATE TABLE t USING PARQUET
+ |OPTIONS (PATH '$path')
+ |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
+ |AS SELECT 1 AS a, 2 AS b
+ """.stripMargin
+ )
+
+ val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
+ assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
+ assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
+ assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
+ assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)
+
+ checkAnswer(table("t"), Row(1, 2))
+ }
+
+ withTable("t") {
+ val path = dir.getCanonicalPath
+
+ sql(
+ s"""CREATE TABLE t USING PARQUET
+ |OPTIONS (PATH '$path')
+ |CLUSTERED BY (a) INTO 2 BUCKETS
+ |AS SELECT 1 AS a, 2 AS b
+ """.stripMargin
+ )
+
+ val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
+ assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
+ assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
+ assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
+ assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+
+ checkAnswer(table("t"), Row(1, 2))
+ }
+ }
+ }
+
+ test("CTAS: persisted partitioned bucketed data source table") {
+ withTempDir { dir =>
+ withTable("t") {
+ val path = dir.getCanonicalPath
+
+ sql(
+ s"""CREATE TABLE t USING PARQUET
+ |OPTIONS (PATH '$path')
+ |PARTITIONED BY (a)
+ |CLUSTERED BY (b) SORTED BY (c) INTO 2 BUCKETS
+ |AS SELECT 1 AS a, 2 AS b, 3 AS c
+ """.stripMargin
+ )
+
+ val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
+ assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
+ assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
+ assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
+ assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)
+
+ checkAnswer(table("t"), Row(2, 3, 1))
+ }
+ }
+ }
}