aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-03-08 23:12:10 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-08 23:12:10 -0800
commit09829be621f0f9bb5076abb3d832925624699fa9 (patch)
tree28fdf4b2394eaf84dd5bee598c164a6a19b702e6 /sql
parentd809ceed9762d5bbb04170e45f38751713112dd8 (diff)
downloadspark-09829be621f0f9bb5076abb3d832925624699fa9.tar.gz
spark-09829be621f0f9bb5076abb3d832925624699fa9.tar.bz2
spark-09829be621f0f9bb5076abb3d832925624699fa9.zip
[SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore
### What changes were proposed in this pull request? So far, the test cases in DDLSuites only verify the behaviors of InMemoryCatalog. That means, they do not cover the scenarios using HiveExternalCatalog. Thus, we need to improve the existing test suite to run these cases using Hive metastore. When porting these test cases, a bug of `SET LOCATION` is found. `path` is not set when the location is changed. After this PR, a few changes are made, as summarized below, - `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using `InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`. - `InMemoryCatalogedDDLSuite` contains all the existing test cases in `DDLSuite`. - `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following test cases are excluded: 1. The following test cases only make sense for `InMemoryCatalog`: ``` test("desc table for parquet data source table using in-memory catalog") test("create a managed Hive source table") { test("create an external Hive source table") test("Create Hive Table As Select") ``` 2. The following test cases are unable to be ported because we are unable to alter table provider when using Hive metastore. In the future PRs we need to improve the test cases so that altering table provider is not needed: ``` test("alter table: set location (datasource table)") test("alter table: set properties (datasource table)") test("alter table: unset properties (datasource table)") test("alter table: set serde (datasource table)") test("alter table: set serde partition (datasource table)") test("alter table: change column (datasource table)") test("alter table: add partition (datasource table)") test("alter table: drop partition (datasource table)") test("alter table: rename partition (datasource table)") test("drop table - data source table") ``` **TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the test cases to either `DDLSuite`, `InMemoryCatalogedDDLSuite` or `HiveCatalogedDDLSuite`. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Author: gatorsmile <gatorsmile@gmail.com> Closes #16592 from gatorsmile/refactorDDLSuite.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala456
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala157
3 files changed, 345 insertions, 273 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index c1f8b2b3d9..aa335c4453 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,23 +30,164 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
- private val escapedIdentifier = "`(.+)`".r
+class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with BeforeAndAfterEach {
override def afterEach(): Unit = {
try {
// drop all databases, tables and functions after each test
spark.sessionState.catalog.reset()
} finally {
- Utils.deleteRecursively(new File("spark-warehouse"))
+ Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath))
super.afterEach()
}
}
+ protected override def generateTable(
+ catalog: SessionCatalog,
+ name: TableIdentifier): CatalogTable = {
+ val storage =
+ CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
+ val metadata = new MetadataBuilder()
+ .putString("key", "value")
+ .build()
+ CatalogTable(
+ identifier = name,
+ tableType = CatalogTableType.EXTERNAL,
+ storage = storage,
+ schema = new StructType()
+ .add("col1", "int", nullable = true, metadata = metadata)
+ .add("col2", "string")
+ .add("a", "int")
+ .add("b", "int"),
+ provider = Some("parquet"),
+ partitionColumnNames = Seq("a", "b"),
+ createTime = 0L,
+ tracksPartitionsInCatalog = true)
+ }
+
+ test("desc table for parquet data source table using in-memory catalog") {
+ val tabName = "tab1"
+ withTable(tabName) {
+ sql(s"CREATE TABLE $tabName(a int comment 'test') USING parquet ")
+
+ checkAnswer(
+ sql(s"DESC $tabName").select("col_name", "data_type", "comment"),
+ Row("a", "int", "test")
+ )
+ }
+ }
+
+ test("alter table: set location (datasource table)") {
+ testSetLocation(isDatasourceTable = true)
+ }
+
+ test("alter table: set properties (datasource table)") {
+ testSetProperties(isDatasourceTable = true)
+ }
+
+ test("alter table: unset properties (datasource table)") {
+ testUnsetProperties(isDatasourceTable = true)
+ }
+
+ test("alter table: set serde (datasource table)") {
+ testSetSerde(isDatasourceTable = true)
+ }
+
+ test("alter table: set serde partition (datasource table)") {
+ testSetSerdePartition(isDatasourceTable = true)
+ }
+
+ test("alter table: change column (datasource table)") {
+ testChangeColumn(isDatasourceTable = true)
+ }
+
+ test("alter table: add partition (datasource table)") {
+ testAddPartitions(isDatasourceTable = true)
+ }
+
+ test("alter table: drop partition (datasource table)") {
+ testDropPartitions(isDatasourceTable = true)
+ }
+
+ test("alter table: rename partition (datasource table)") {
+ testRenamePartitions(isDatasourceTable = true)
+ }
+
+ test("drop table - data source table") {
+ testDropTable(isDatasourceTable = true)
+ }
+
+ test("create a managed Hive source table") {
+ assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+ val tabName = "tbl"
+ withTable(tabName) {
+ val e = intercept[AnalysisException] {
+ sql(s"CREATE TABLE $tabName (i INT, j STRING)")
+ }.getMessage
+ assert(e.contains("Hive support is required to CREATE Hive TABLE"))
+ }
+ }
+
+ test("create an external Hive source table") {
+ assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+ withTempDir { tempDir =>
+ val tabName = "tbl"
+ withTable(tabName) {
+ val e = intercept[AnalysisException] {
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |LOCATION '${tempDir.toURI}'
+ """.stripMargin)
+ }.getMessage
+ assert(e.contains("Hive support is required to CREATE Hive TABLE"))
+ }
+ }
+ }
+
+ test("Create Hive Table As Select") {
+ import testImplicits._
+ withTable("t", "t1") {
+ var e = intercept[AnalysisException] {
+ sql("CREATE TABLE t SELECT 1 as a, 1 as b")
+ }.getMessage
+ assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
+
+ spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
+ e = intercept[AnalysisException] {
+ sql("CREATE TABLE t SELECT a, b from t1")
+ }.getMessage
+ assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
+ }
+ }
+
+}
+
+abstract class DDLSuite extends QueryTest with SQLTestUtils {
+
+ protected def isUsingHiveMetastore: Boolean = {
+ spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive"
+ }
+
+ protected def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable
+
+ private val escapedIdentifier = "`(.+)`".r
+
+ protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = table
+
+ private def normalizeSerdeProp(props: Map[String, String]): Map[String, String] = {
+ props.filterNot(p => Seq("serialization.format", "path").contains(p._1))
+ }
+
+ private def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
+ assert(normalizeCatalogTable(actual) == normalizeCatalogTable(expected))
+ }
+
/**
* Strip backticks, if any, from the string.
*/
@@ -75,33 +216,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
ignoreIfExists = false)
}
- private def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable = {
- val storage =
- CatalogStorageFormat(
- locationUri = Some(catalog.defaultTablePath(name)),
- inputFormat = None,
- outputFormat = None,
- serde = None,
- compressed = false,
- properties = Map())
- val metadata = new MetadataBuilder()
- .putString("key", "value")
- .build()
- CatalogTable(
- identifier = name,
- tableType = CatalogTableType.EXTERNAL,
- storage = storage,
- schema = new StructType()
- .add("col1", "int", nullable = true, metadata = metadata)
- .add("col2", "string")
- .add("a", "int")
- .add("b", "int"),
- provider = Some("parquet"),
- partitionColumnNames = Seq("a", "b"),
- createTime = 0L,
- tracksPartitionsInCatalog = true)
- }
-
private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
catalog.createTable(generateTable(catalog, name), ignoreIfExists = false)
}
@@ -115,6 +229,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
}
+ private def getDBPath(dbName: String): URI = {
+ val warehousePath = s"file:${spark.sessionState.conf.warehousePath.stripPrefix("file:")}"
+ new Path(warehousePath, s"$dbName.db").toUri
+ }
+
test("the qualified path of a database is stored in the catalog") {
val catalog = spark.sessionState.catalog
@@ -138,11 +257,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
try {
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbName)
- val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbName.db")
assert(db1 == CatalogDatabase(
dbName,
"",
- expectedLocation,
+ getDBPath(dbName),
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbName))
@@ -185,16 +303,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
- val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db")
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
- expectedLocation,
+ getDBPath(dbNameWithoutBackTicks),
Map.empty))
- intercept[DatabaseAlreadyExistsException] {
+ // TODO: HiveExternalCatalog should throw DatabaseAlreadyExistsException
+ val e = intercept[AnalysisException] {
sql(s"CREATE DATABASE $dbName")
- }
+ }.getMessage
+ assert(e.contains(s"already exists"))
} finally {
catalog.reset()
}
@@ -413,19 +532,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- test("desc table for parquet data source table using in-memory catalog") {
- assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
- val tabName = "tab1"
- withTable(tabName) {
- sql(s"CREATE TABLE $tabName(a int comment 'test') USING parquet ")
-
- checkAnswer(
- sql(s"DESC $tabName").select("col_name", "data_type", "comment"),
- Row("a", "int", "test")
- )
- }
- }
-
test("Alter/Describe Database") {
val catalog = spark.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
@@ -433,7 +539,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
- val location = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db")
+ val location = getDBPath(dbNameWithoutBackTicks)
sql(s"CREATE DATABASE $dbName")
@@ -477,7 +583,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
var message = intercept[AnalysisException] {
sql(s"DROP DATABASE $dbName")
}.getMessage
- assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
+ // TODO: Unify the exception.
+ if (isUsingHiveMetastore) {
+ assert(message.contains(s"NoSuchObjectException: $dbNameWithoutBackTicks"))
+ } else {
+ assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
+ }
message = intercept[AnalysisException] {
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
@@ -506,7 +617,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val message = intercept[AnalysisException] {
sql(s"DROP DATABASE $dbName RESTRICT")
}.getMessage
- assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
+ // TODO: Unify the exception.
+ if (isUsingHiveMetastore) {
+ assert(message.contains(s"Database $dbName is not empty. One or more tables exist"))
+ } else {
+ assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
+ }
catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false)
@@ -537,7 +653,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
createTable(catalog, tableIdent1)
val expectedTableIdent = tableIdent1.copy(database = Some("default"))
val expectedTable = generateTable(catalog, expectedTableIdent)
- assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+ checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
}
test("create table in a specific db") {
@@ -546,7 +662,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
createTable(catalog, tableIdent1)
val expectedTable = generateTable(catalog, tableIdent1)
- assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+ checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
}
test("create table using") {
@@ -731,52 +847,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testSetLocation(isDatasourceTable = false)
}
- test("alter table: set location (datasource table)") {
- testSetLocation(isDatasourceTable = true)
- }
-
test("alter table: set properties") {
testSetProperties(isDatasourceTable = false)
}
- test("alter table: set properties (datasource table)") {
- testSetProperties(isDatasourceTable = true)
- }
-
test("alter table: unset properties") {
testUnsetProperties(isDatasourceTable = false)
}
- test("alter table: unset properties (datasource table)") {
- testUnsetProperties(isDatasourceTable = true)
- }
-
// TODO: move this test to HiveDDLSuite.scala
ignore("alter table: set serde") {
testSetSerde(isDatasourceTable = false)
}
- test("alter table: set serde (datasource table)") {
- testSetSerde(isDatasourceTable = true)
- }
-
// TODO: move this test to HiveDDLSuite.scala
ignore("alter table: set serde partition") {
testSetSerdePartition(isDatasourceTable = false)
}
- test("alter table: set serde partition (datasource table)") {
- testSetSerdePartition(isDatasourceTable = true)
- }
-
test("alter table: change column") {
testChangeColumn(isDatasourceTable = false)
}
- test("alter table: change column (datasource table)") {
- testChangeColumn(isDatasourceTable = true)
- }
-
test("alter table: bucketing is not supported") {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -805,10 +897,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testAddPartitions(isDatasourceTable = false)
}
- test("alter table: add partition (datasource table)") {
- testAddPartitions(isDatasourceTable = true)
- }
-
test("alter table: recover partitions (sequential)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
testRecoverPartitions()
@@ -821,7 +909,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def testRecoverPartitions() {
+ protected def testRecoverPartitions() {
val catalog = spark.sessionState.catalog
// table to alter does not exist
intercept[AnalysisException] {
@@ -860,8 +948,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
- assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
- assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+ if (!isUsingHiveMetastore) {
+ assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
+ assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+ } else {
+ // After ALTER TABLE, the statistics of the first partition is removed by Hive megastore
+ assert(catalog.getPartition(tableIdent, part1).parameters.get("numFiles").isEmpty)
+ assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+ }
} finally {
fs.delete(root, true)
}
@@ -875,10 +969,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testDropPartitions(isDatasourceTable = false)
}
- test("alter table: drop partition (datasource table)") {
- testDropPartitions(isDatasourceTable = true)
- }
-
test("alter table: drop partition is not supported for views") {
assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')")
}
@@ -887,10 +977,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testRenamePartitions(isDatasourceTable = false)
}
- test("alter table: rename partition (datasource table)") {
- testRenamePartitions(isDatasourceTable = true)
- }
-
test("show table extended") {
withTempView("show1a", "show2b") {
sql(
@@ -971,11 +1057,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testDropTable(isDatasourceTable = false)
}
- test("drop table - data source table") {
- testDropTable(isDatasourceTable = true)
- }
-
- private def testDropTable(isDatasourceTable: Boolean): Unit = {
+ protected def testDropTable(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@@ -1011,9 +1093,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
tableIdent: TableIdentifier): Unit = {
catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
provider = Some("csv")))
+ assert(catalog.getTableMetadata(tableIdent).provider == Some("csv"))
}
- private def testSetProperties(isDatasourceTable: Boolean): Unit = {
+ protected def testSetProperties(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@@ -1022,7 +1105,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
def getProps: Map[String, String] = {
- catalog.getTableMetadata(tableIdent).properties
+ if (isUsingHiveMetastore) {
+ normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
+ } else {
+ catalog.getTableMetadata(tableIdent).properties
+ }
}
assert(getProps.isEmpty)
// set table properties
@@ -1038,7 +1125,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
+ protected def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@@ -1047,7 +1134,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
def getProps: Map[String, String] = {
- catalog.getTableMetadata(tableIdent).properties
+ if (isUsingHiveMetastore) {
+ normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
+ } else {
+ catalog.getTableMetadata(tableIdent).properties
+ }
}
// unset table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')")
@@ -1071,7 +1162,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(getProps == Map("x" -> "y"))
}
- private def testSetLocation(isDatasourceTable: Boolean): Unit = {
+ protected def testSetLocation(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val partSpec = Map("a" -> "1", "b" -> "2")
@@ -1082,24 +1173,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined)
- assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
+ assert(normalizeSerdeProp(catalog.getTableMetadata(tableIdent).storage.properties).isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined)
- assert(catalog.getPartition(tableIdent, partSpec).storage.properties.isEmpty)
+ assert(
+ normalizeSerdeProp(catalog.getPartition(tableIdent, partSpec).storage.properties).isEmpty)
+
// Verify that the location is set to the expected string
def verifyLocation(expected: URI, spec: Option[TablePartitionSpec] = None): Unit = {
val storageFormat = spec
.map { s => catalog.getPartition(tableIdent, s).storage }
.getOrElse { catalog.getTableMetadata(tableIdent).storage }
- if (isDatasourceTable) {
- if (spec.isDefined) {
- assert(storageFormat.properties.isEmpty)
- assert(storageFormat.locationUri === Some(expected))
- } else {
- assert(storageFormat.locationUri === Some(expected))
- }
- } else {
- assert(storageFormat.locationUri === Some(expected))
- }
+ // TODO(gatorsmile): fix the bug in alter table set location.
+ // if (isUsingHiveMetastore) {
+ // assert(storageFormat.properties.get("path") === expected)
+ // }
+ assert(storageFormat.locationUri === Some(expected))
}
// set table location
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
@@ -1124,7 +1212,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def testSetSerde(isDatasourceTable: Boolean): Unit = {
+ protected def testSetSerde(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@@ -1132,8 +1220,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
- assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
- assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
+ def checkSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
+ val serdeProp = catalog.getTableMetadata(tableIdent).storage.properties
+ if (isUsingHiveMetastore) {
+ assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
+ } else {
+ assert(serdeProp == expectedSerdeProps)
+ }
+ }
+ if (isUsingHiveMetastore) {
+ assert(catalog.getTableMetadata(tableIdent).storage.serde ==
+ Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+ } else {
+ assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
+ }
+ checkSerdeProps(Map.empty[String, String])
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
@@ -1146,31 +1247,30 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e1.getMessage.contains("datasource"))
assert(e2.getMessage.contains("datasource"))
} else {
- sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
- assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop"))
- assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
- sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
+ val newSerde = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+ sql(s"ALTER TABLE dbx.tab1 SET SERDE '$newSerde'")
+ assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(newSerde))
+ checkSerdeProps(Map.empty[String, String])
+ val serde2 = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"
+ sql(s"ALTER TABLE dbx.tab1 SET SERDE '$serde2' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
- assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop"))
- assert(catalog.getTableMetadata(tableIdent).storage.properties ==
- Map("k" -> "v", "kay" -> "vee"))
+ assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(serde2))
+ checkSerdeProps(Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
- assert(catalog.getTableMetadata(tableIdent).storage.properties ==
- Map("k" -> "vvv", "kay" -> "vee"))
+ checkSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
- assert(catalog.getTableMetadata(tableIdent).storage.properties ==
- Map("k" -> "vvv", "kay" -> "veee"))
+ checkSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
}
}
- private def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
+ protected def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val spec = Map("a" -> "1", "b" -> "2")
@@ -1183,8 +1283,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
- assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
- assert(catalog.getPartition(tableIdent, spec).storage.properties.isEmpty)
+ def checkPartitionSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
+ val serdeProp = catalog.getPartition(tableIdent, spec).storage.properties
+ if (isUsingHiveMetastore) {
+ assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
+ } else {
+ assert(serdeProp == expectedSerdeProps)
+ }
+ }
+ if (isUsingHiveMetastore) {
+ assert(catalog.getPartition(tableIdent, spec).storage.serde ==
+ Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+ } else {
+ assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
+ }
+ checkPartitionSerdeProps(Map.empty[String, String])
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
@@ -1199,26 +1312,23 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
} else {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'")
assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop"))
- assert(catalog.getPartition(tableIdent, spec).storage.properties.isEmpty)
+ checkPartitionSerdeProps(Map.empty[String, String])
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop"))
- assert(catalog.getPartition(tableIdent, spec).storage.properties ==
- Map("k" -> "v", "kay" -> "vee"))
+ checkPartitionSerdeProps(Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " +
"SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
- assert(catalog.getPartition(tableIdent, spec).storage.properties ==
- Map("k" -> "vvv", "kay" -> "vee"))
+ checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
}
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')")
- assert(catalog.getPartition(tableIdent, spec).storage.properties ==
- Map("k" -> "vvv", "kay" -> "veee"))
+ checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
}
// table to alter does not exist
intercept[AnalysisException] {
@@ -1226,7 +1336,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
+ protected def testAddPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "5")
@@ -1247,7 +1357,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
"PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
- assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(new URI("paris")))
+ val partitionLocation = if (isUsingHiveMetastore) {
+ val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
+ assert(tableLocation.isDefined)
+ makeQualifiedPath(new Path(tableLocation.get.toString, "paris"))
+ } else {
+ new URI("paris")
+ }
+
+ assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(partitionLocation))
assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)
// add partitions without explicitly specifying database
@@ -1277,7 +1395,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Set(part1, part2, part3, part4, part5))
}
- private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
+ protected def testDropPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "5")
@@ -1330,7 +1448,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.listPartitions(tableIdent).isEmpty)
}
- private def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
+ protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "q")
@@ -1374,7 +1492,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
}
- private def testChangeColumn(isDatasourceTable: Boolean): Unit = {
+ protected def testChangeColumn(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val resolver = spark.sessionState.conf.resolver
val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -1474,35 +1592,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
)
}
- test("create a managed Hive source table") {
- assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
- val tabName = "tbl"
- withTable(tabName) {
- val e = intercept[AnalysisException] {
- sql(s"CREATE TABLE $tabName (i INT, j STRING)")
- }.getMessage
- assert(e.contains("Hive support is required to CREATE Hive TABLE"))
- }
- }
-
- test("create an external Hive source table") {
- assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
- withTempDir { tempDir =>
- val tabName = "tbl"
- withTable(tabName) {
- val e = intercept[AnalysisException] {
- sql(
- s"""
- |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
- |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
- |LOCATION '${tempDir.toURI}'
- """.stripMargin)
- }.getMessage
- assert(e.contains("Hive support is required to CREATE Hive TABLE"))
- }
- }
- }
-
test("create a data source table without schema") {
import testImplicits._
withTempPath { tempDir =>
@@ -1541,22 +1630,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- test("Create Hive Table As Select") {
- import testImplicits._
- withTable("t", "t1") {
- var e = intercept[AnalysisException] {
- sql("CREATE TABLE t SELECT 1 as a, 1 as b")
- }.getMessage
- assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
-
- spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
- e = intercept[AnalysisException] {
- sql("CREATE TABLE t SELECT a, b from t1")
- }.getMessage
- assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
- }
- }
-
test("Create Data Source Table As Select") {
import testImplicits._
withTable("t", "t1", "t2") {
@@ -1580,7 +1653,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("drop default database") {
- Seq("true", "false").foreach { caseSensitive =>
+ val caseSensitiveOptions = if (isUsingHiveMetastore) Seq("false") else Seq("true", "false")
+ caseSensitiveOptions.foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
var message = intercept[AnalysisException] {
sql("DROP DATABASE default")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 9201954b66..12fc8993d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -306,6 +306,11 @@ private[sql] trait SQLTestUtils
val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(hadoopPath).toUri
}
+
+ def makeQualifiedPath(path: Path): URI = {
+ val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
+ fs.makeQualified(path).toUri
+ }
}
private[sql] object SQLTestUtils {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 10d929a4a0..fce055048d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -27,16 +27,88 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+
+// TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
+class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeAndAfterEach {
+ override def afterEach(): Unit = {
+ try {
+ // drop all databases, tables and functions after each test
+ spark.sessionState.catalog.reset()
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ protected override def generateTable(
+ catalog: SessionCatalog,
+ name: TableIdentifier): CatalogTable = {
+ val storage =
+ CatalogStorageFormat(
+ locationUri = Some(catalog.defaultTablePath(name)),
+ inputFormat = Some("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+ outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"),
+ serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+ compressed = false,
+ properties = Map("serialization.format" -> "1"))
+ val metadata = new MetadataBuilder()
+ .putString("key", "value")
+ .build()
+ CatalogTable(
+ identifier = name,
+ tableType = CatalogTableType.EXTERNAL,
+ storage = storage,
+ schema = new StructType()
+ .add("col1", "int", nullable = true, metadata = metadata)
+ .add("col2", "string")
+ .add("a", "int")
+ .add("b", "int"),
+ provider = Some("hive"),
+ partitionColumnNames = Seq("a", "b"),
+ createTime = 0L,
+ tracksPartitionsInCatalog = true)
+ }
+
+ protected override def normalizeCatalogTable(table: CatalogTable): CatalogTable = {
+ val nondeterministicProps = Set(
+ "CreateTime",
+ "transient_lastDdlTime",
+ "grantTime",
+ "lastUpdateTime",
+ "last_modified_by",
+ "last_modified_time",
+ "Owner:",
+ "COLUMN_STATS_ACCURATE",
+ // The following are hive specific schema parameters which we do not need to match exactly.
+ "numFiles",
+ "numRows",
+ "rawDataSize",
+ "totalSize",
+ "totalNumberFiles",
+ "maxFileSize",
+ "minFileSize"
+ )
+
+ table.copy(
+ createTime = 0L,
+ lastAccessTime = 0L,
+ owner = "",
+ properties = table.properties.filterKeys(!nondeterministicProps.contains(_)),
+ // View texts are checked separately
+ viewText = None
+ )
+ }
+
+}
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
@@ -1720,61 +1792,6 @@ class HiveDDLSuite
}
Seq("a b", "a:b", "a%b").foreach { specialChars =>
- test(s"datasource table: location uri contains $specialChars") {
- withTable("t", "t1") {
- withTempDir { dir =>
- val loc = new File(dir, specialChars)
- loc.mkdir()
- spark.sql(
- s"""
- |CREATE TABLE t(a string)
- |USING parquet
- |LOCATION '$loc'
- """.stripMargin)
-
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new Path(loc.getAbsolutePath).toUri)
- assert(new Path(table.location).toString.contains(specialChars))
-
- assert(loc.listFiles().isEmpty)
- spark.sql("INSERT INTO TABLE t SELECT 1")
- assert(loc.listFiles().length >= 1)
- checkAnswer(spark.table("t"), Row("1") :: Nil)
- }
-
- withTempDir { dir =>
- val loc = new File(dir, specialChars)
- loc.mkdir()
- spark.sql(
- s"""
- |CREATE TABLE t1(a string, b string)
- |USING parquet
- |PARTITIONED BY(b)
- |LOCATION '$loc'
- """.stripMargin)
-
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == new Path(loc.getAbsolutePath).toUri)
- assert(new Path(table.location).toString.contains(specialChars))
-
- assert(loc.listFiles().isEmpty)
- spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
- val partFile = new File(loc, "b=2")
- assert(partFile.listFiles().length >= 1)
- checkAnswer(spark.table("t1"), Row("1", "2") :: Nil)
-
- spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
- val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14")
- assert(!partFile1.exists())
- val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14")
- assert(partFile2.listFiles().length >= 1)
- checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil)
- }
- }
- }
- }
-
- Seq("a b", "a:b", "a%b").foreach { specialChars =>
test(s"hive table: location uri contains $specialChars") {
withTable("t") {
withTempDir { dir =>
@@ -1848,28 +1865,4 @@ class HiveDDLSuite
}
}
}
-
- Seq("a b", "a:b", "a%b").foreach { specialChars =>
- test(s"location uri contains $specialChars for database") {
- try {
- withTable("t") {
- withTempDir { dir =>
- val loc = new File(dir, specialChars)
- spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'")
- spark.sql("USE tmpdb")
-
- Seq(1).toDF("a").write.saveAsTable("t")
- val tblloc = new File(loc, "t")
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- val tblPath = new Path(tblloc.getAbsolutePath)
- val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
- assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath))
- assert(tblloc.listFiles().nonEmpty)
- }
- }
- } finally {
- spark.sql("DROP DATABASE IF EXISTS tmpdb")
- }
- }
- }
}