aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-08-05 10:50:26 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-08-05 10:50:26 +0200
commit5effc016c893ce917d535cc1b5026d8e4c846721 (patch)
tree59e28575a90ec38a17b26dad58297c5d5bfd8436 /sql/core/src/test/scala
parentfaaefab26ffea3a5edfeaff42db222c8cd3ff5f1 (diff)
downloadspark-5effc016c893ce917d535cc1b5026d8e4c846721.tar.gz
spark-5effc016c893ce917d535cc1b5026d8e4c846721.tar.bz2
spark-5effc016c893ce917d535cc1b5026d8e4c846721.zip
[SPARK-16879][SQL] unify logical plans for CREATE TABLE and CTAS
## What changes were proposed in this pull request? we have various logical plans for CREATE TABLE and CTAS: `CreateTableUsing`, `CreateTableUsingAsSelect`, `CreateHiveTableAsSelectLogicalPlan`. This PR unifies them to reduce the complexity and centralize the error handling. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #14482 from cloud-fan/table.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala151
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala47
2 files changed, 105 insertions, 93 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 999afc9751..044fa5fb9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.sql.execution.command
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, FunctionResource}
-import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
-import org.apache.spark.sql.execution.datasources.CreateTableUsing
+import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
@@ -243,12 +242,12 @@ class DDLCommandSuite extends PlanTest {
allSources.foreach { s =>
val query = s"CREATE TABLE my_tab STORED AS $s"
- val ct = parseAs[CreateTableCommand](query)
+ val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
- assert(ct.table.storage.serde == hiveSerde.get.serde)
- assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
- assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+ assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+ assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
+ assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
}
}
@@ -259,14 +258,14 @@ class DDLCommandSuite extends PlanTest {
val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat"
// No conflicting serdes here, OK
- val parsed1 = parseAs[CreateTableCommand](query1)
- assert(parsed1.table.storage.serde == Some("anything"))
- assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
- assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
- val parsed2 = parseAs[CreateTableCommand](query2)
- assert(parsed2.table.storage.serde.isEmpty)
- assert(parsed2.table.storage.inputFormat == Some("inputfmt"))
- assert(parsed2.table.storage.outputFormat == Some("outputfmt"))
+ val parsed1 = parseAs[CreateTable](query1)
+ assert(parsed1.tableDesc.storage.serde == Some("anything"))
+ assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt"))
+ assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt"))
+ val parsed2 = parseAs[CreateTable](query2)
+ assert(parsed2.tableDesc.storage.serde.isEmpty)
+ assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt"))
+ assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt"))
}
test("create table - row format serde and generic file format") {
@@ -276,12 +275,12 @@ class DDLCommandSuite extends PlanTest {
allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
if (supportedSources.contains(s)) {
- val ct = parseAs[CreateTableCommand](query)
+ val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
- assert(ct.table.storage.serde == Some("anything"))
- assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
- assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+ assert(ct.tableDesc.storage.serde == Some("anything"))
+ assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
+ assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format serde", "incompatible", s))
}
@@ -295,12 +294,12 @@ class DDLCommandSuite extends PlanTest {
allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
if (supportedSources.contains(s)) {
- val ct = parseAs[CreateTableCommand](query)
+ val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
- assert(ct.table.storage.serde == hiveSerde.get.serde)
- assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
- assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+ assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+ assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
+ assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s))
}
@@ -312,9 +311,9 @@ class DDLCommandSuite extends PlanTest {
sql = "CREATE EXTERNAL TABLE my_tab",
containsThesePhrases = Seq("create external table", "location"))
val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
- val ct = parseAs[CreateTableCommand](query)
- assert(ct.table.tableType == CatalogTableType.EXTERNAL)
- assert(ct.table.storage.locationUri == Some("/something/anything"))
+ val ct = parseAs[CreateTable](query)
+ assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
+ assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
}
test("create table - property values must be set") {
@@ -329,47 +328,29 @@ class DDLCommandSuite extends PlanTest {
test("create table - location implies external") {
val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
- val ct = parseAs[CreateTableCommand](query)
- assert(ct.table.tableType == CatalogTableType.EXTERNAL)
- assert(ct.table.storage.locationUri == Some("/something/anything"))
- }
-
- test("create table - column repeated in partitioning columns") {
- val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)"
- val e = intercept[ParseException] { parser.parsePlan(query) }
- assert(e.getMessage.contains(
- "Operation not allowed: Partition columns may not be specified in the schema: [\"key\"]"))
- }
-
- test("create table - duplicate column names in the table definition") {
- val query = "CREATE TABLE default.tab1 (key INT, key STRING)"
- val e = intercept[ParseException] { parser.parsePlan(query) }
- assert(e.getMessage.contains("Operation not allowed: Duplicated column names found in " +
- "table definition of `default`.`tab1`: [\"key\"]"))
+ val ct = parseAs[CreateTable](query)
+ assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
+ assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
}
test("create table using - with partitioned by") {
val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
"USING parquet PARTITIONED BY (a)"
- val expected = CreateTableUsing(
- TableIdentifier("my_tab"),
- Some(new StructType()
+
+ val expectedTableDesc = CatalogTable(
+ identifier = TableIdentifier("my_tab"),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType()
.add("a", IntegerType, nullable = true, "test")
- .add("b", StringType)),
- "parquet",
- false,
- Map.empty,
- null,
- None,
- false,
- true)
+ .add("b", StringType),
+ provider = Some("parquet"),
+ partitionColumnNames = Seq("a")
+ )
parser.parsePlan(query) match {
- case ct: CreateTableUsing =>
- // We can't compare array in `CreateTableUsing` directly, so here we compare
- // `partitionColumns` ahead, and make `partitionColumns` null before plan comparison.
- assert(Seq("a") == ct.partitionColumns.toSeq)
- comparePlans(ct.copy(partitionColumns = null), expected)
+ case CreateTable(tableDesc, _, None) =>
+ assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
@@ -379,23 +360,19 @@ class DDLCommandSuite extends PlanTest {
test("create table using - with bucket") {
val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
- val expected = CreateTableUsing(
- TableIdentifier("my_tab"),
- Some(new StructType().add("a", IntegerType).add("b", StringType)),
- "parquet",
- false,
- Map.empty,
- null,
- Some(BucketSpec(5, Seq("a"), Seq("b"))),
- false,
- true)
+
+ val expectedTableDesc = CatalogTable(
+ identifier = TableIdentifier("my_tab"),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("a", IntegerType).add("b", StringType),
+ provider = Some("parquet"),
+ bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b")))
+ )
parser.parsePlan(query) match {
- case ct: CreateTableUsing =>
- // `Array.empty == Array.empty` returns false, here we set `partitionColumns` to null before
- // plan comparison.
- assert(ct.partitionColumns.isEmpty)
- comparePlans(ct.copy(partitionColumns = null), expected)
+ case CreateTable(tableDesc, _, None) =>
+ assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
@@ -907,22 +884,20 @@ class DDLCommandSuite extends PlanTest {
|CREATE TABLE table_name USING json
|OPTIONS (a 1, b 0.1, c TRUE)
""".stripMargin
- val expected = CreateTableUsing(
- TableIdentifier("table_name"),
- None,
- "json",
- false,
- Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
- null,
- None,
- false,
- true)
+
+ val expectedTableDesc = CatalogTable(
+ identifier = TableIdentifier("table_name"),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty.copy(
+ properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true")
+ ),
+ schema = new StructType,
+ provider = Some("json")
+ )
parser.parsePlan(sql) match {
- case ct: CreateTableUsing =>
- // We can't compare array in `CreateTableUsing` directly, so here we explicitly
- // set partitionColumns to `null` and then compare it.
- comparePlans(ct.copy(partitionColumns = null), expected)
+ case CreateTable(tableDesc, _, None) =>
+ assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 564fc73ee7..ca9b210125 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, Catal
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -94,6 +93,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
+ provider = Some("parquet"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L)
}
@@ -359,6 +359,43 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
+ test("create table - duplicate column names in the table definition") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int, a string) USING json")
+ }
+ assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
+ }
+
+ test("create table - partition column names not in table definition") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int, b string) USING json PARTITIONED BY (c)")
+ }
+ assert(e.message == "partition column c is not defined in table `tbl`, " +
+ "defined table columns are: a, b")
+ }
+
+ test("create table - bucket column names not in table definition") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int, b string) USING json CLUSTERED BY (c) INTO 4 BUCKETS")
+ }
+ assert(e.message == "bucket column c is not defined in table `tbl`, " +
+ "defined table columns are: a, b")
+ }
+
+ test("create table - column repeated in partition columns") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int) USING json PARTITIONED BY (a, a)")
+ }
+ assert(e.message == "Found duplicate column(s) in partition: a")
+ }
+
+ test("create table - column repeated in bucket columns") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int) USING json CLUSTERED BY (a, a) INTO 4 BUCKETS")
+ }
+ assert(e.message == "Found duplicate column(s) in bucket: a")
+ }
+
test("Describe Table with Corrupted Schema") {
import testImplicits._
@@ -1469,7 +1506,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
withTable("jsonTable") {
(("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
- val e = intercept[ParseException] {
+ val e = intercept[AnalysisException] {
sql(
s"""
|CREATE TABLE jsonTable
@@ -1479,9 +1516,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|)
|CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS
""".stripMargin)
- }.getMessage
- assert(e.contains(
- "Expected explicit specification of table schema when using CLUSTERED BY clause"))
+ }
+ assert(e.message == "Cannot specify bucketing information if the table schema is not " +
+ "specified when creating and will be inferred at runtime")
}
}
}