aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-05-17 10:12:51 -0700
committerYin Huai <yhuai@databricks.com>2016-05-17 10:12:51 -0700
commit20a89478e168cb6901ef89f4cb6aa79193ed244a (patch)
tree69aef55b7ce970e76bacc33ce0aaedc7b37fa9d8
parentc0c3ec35476c756e569a1f34c4b258eb0490585c (diff)
downloadspark-20a89478e168cb6901ef89f4cb6aa79193ed244a.tar.gz
spark-20a89478e168cb6901ef89f4cb6aa79193ed244a.tar.bz2
spark-20a89478e168cb6901ef89f4cb6aa79193ed244a.zip
[SPARK-14346][SQL][FOLLOW-UP] add tests for CREAT TABLE USING with partition and bucket
## What changes were proposed in this pull request? https://github.com/apache/spark/pull/12781 introduced PARTITIONED BY, CLUSTERED BY, and SORTED BY keywords to CREATE TABLE USING. This PR adds tests to make sure those keywords are handled correctly. This PR also fixes a mistake that we should create non-hive-compatible table if partition or bucket info exists. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #13144 from cloud-fan/add-test.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala53
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala44
3 files changed, 106 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 7d3c52570f..70e5108d93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -399,8 +399,8 @@ object CreateDataSourceTableUtils extends Logging {
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
(None, message)
- case (Some(serde), relation: HadoopFsRelation)
- if relation.location.paths.length == 1 && relation.partitionSchema.isEmpty =>
+ case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 &&
+ relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty =>
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
val message =
s"Persisting data source relation $qualifiedTableName with a single input path " +
@@ -415,6 +415,13 @@ object CreateDataSourceTableUtils extends Logging {
"Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
(None, message)
+ case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty =>
+ val message =
+ s"Persisting bucketed data source relation $qualifiedTableName into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
+ "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
+ (None, message)
+
case (Some(serde), relation: HadoopFsRelation) =>
val message =
s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 13df4493e2..897170ea57 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -24,7 +24,9 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
+import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
// TODO: merge this with DDLSuite (SPARK-14441)
class DDLCommandSuite extends PlanTest {
@@ -238,6 +240,57 @@ class DDLCommandSuite extends PlanTest {
}
}
+ test("create table using - with partitioned by") {
+ val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet PARTITIONED BY (a)"
+ val expected = CreateTableUsing(
+ TableIdentifier("my_tab"),
+ Some(new StructType().add("a", IntegerType).add("b", StringType)),
+ "parquet",
+ false,
+ Map.empty,
+ null,
+ None,
+ false,
+ true)
+
+ parser.parsePlan(query) match {
+ case ct: CreateTableUsing =>
+ // We can't compare array in `CreateTableUsing` directly, so here we compare
+ // `partitionColumns` ahead, and make `partitionColumns` null before plan comparison.
+ assert(Seq("a") == ct.partitionColumns.toSeq)
+ comparePlans(ct.copy(partitionColumns = null), expected)
+ case other =>
+ fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+ s"got ${other.getClass.getName}: $query")
+ }
+ }
+
+ test("create table using - with bucket") {
+ val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
+ "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
+ val expected = CreateTableUsing(
+ TableIdentifier("my_tab"),
+ Some(new StructType().add("a", IntegerType).add("b", StringType)),
+ "parquet",
+ false,
+ Map.empty,
+ null,
+ Some(BucketSpec(5, Seq("a"), Seq("b"))),
+ false,
+ true)
+
+ parser.parsePlan(query) match {
+ case ct: CreateTableUsing =>
+ // `Array.empty == Array.empty` returns false, here we set `partitionColumns` to null before
+ // plan comparison.
+ assert(ct.partitionColumns.isEmpty)
+ comparePlans(ct.copy(partitionColumns = null), expected)
+ case other =>
+ fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+ s"got ${other.getClass.getName}: $query")
+ }
+ }
+
// ALTER TABLE table_name RENAME TO new_table_name;
// ALTER VIEW view_name RENAME TO new_view_name;
test("alter table/view: rename table/view") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 82123bec88..d72dc092e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private val escapedIdentifier = "`(.+)`".r
@@ -350,6 +352,48 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
}
+ test("create table using") {
+ val catalog = spark.sessionState.catalog
+ withTable("tbl") {
+ sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
+ val table = catalog.getTableMetadata(TableIdentifier("tbl"))
+ assert(table.tableType == CatalogTableType.MANAGED)
+ assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", "int")))
+ assert(table.properties("spark.sql.sources.provider") == "parquet")
+ }
+ }
+
+ test("create table using - with partitioned by") {
+ val catalog = spark.sessionState.catalog
+ withTable("tbl") {
+ sql("CREATE TABLE tbl(a INT, b INT) USING parquet PARTITIONED BY (a)")
+ val table = catalog.getTableMetadata(TableIdentifier("tbl"))
+ assert(table.tableType == CatalogTableType.MANAGED)
+ assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
+ assert(table.properties("spark.sql.sources.provider") == "parquet")
+ assert(DDLUtils.getSchemaFromTableProperties(table) ==
+ Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+ assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
+ Seq("a"))
+ }
+ }
+
+ test("create table using - with bucket") {
+ val catalog = spark.sessionState.catalog
+ withTable("tbl") {
+ sql("CREATE TABLE tbl(a INT, b INT) USING parquet " +
+ "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS")
+ val table = catalog.getTableMetadata(TableIdentifier("tbl"))
+ assert(table.tableType == CatalogTableType.MANAGED)
+ assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
+ assert(table.properties("spark.sql.sources.provider") == "parquet")
+ assert(DDLUtils.getSchemaFromTableProperties(table) ==
+ Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+ assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
+ Some(BucketSpec(5, Seq("a"), Seq("b"))))
+ }
+ }
+
test("alter table: rename") {
val catalog = spark.sessionState.catalog
val tableIdent1 = TableIdentifier("tab1", Some("dbx"))