aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala456
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala157
3 files changed, 345 insertions, 273 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index c1f8b2b3d9..aa335c4453 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,23 +30,164 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
- private val escapedIdentifier = "`(.+)`".r
+class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with BeforeAndAfterEach {
override def afterEach(): Unit = {
try {
// drop all databases, tables and functions after each test
spark.sessionState.catalog.reset()
} finally {
- Utils.deleteRecursively(new File("spark-warehouse"))
+ Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath))
super.afterEach()
}
}
+ protected override def generateTable(
+ catalog: SessionCatalog,
+ name: TableIdentifier): CatalogTable = {
+ val storage =
+ CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
+ val metadata = new MetadataBuilder()
+ .putString("key", "value")
+ .build()
+ CatalogTable(
+ identifier = name,
+ tableType = CatalogTableType.EXTERNAL,
+ storage = storage,
+ schema = new StructType()
+ .add("col1", "int", nullable = true, metadata = metadata)
+ .add("col2", "string")
+ .add("a", "int")
+ .add("b", "int"),
+ provider = Some("parquet"),
+ partitionColumnNames = Seq("a", "b"),
+ createTime = 0L,
+ tracksPartitionsInCatalog = true)
+ }
+
+ test("desc table for parquet data source table using in-memory catalog") {
+ val tabName = "tab1"
+ withTable(tabName) {
+ sql(s"CREATE TABLE $tabName(a int comment 'test') USING parquet ")
+
+ checkAnswer(
+ sql(s"DESC $tabName").select("col_name", "data_type", "comment"),
+ Row("a", "int", "test")
+ )
+ }
+ }
+
+ test("alter table: set location (datasource table)") {
+ testSetLocation(isDatasourceTable = true)
+ }
+
+ test("alter table: set properties (datasource table)") {
+ testSetProperties(isDatasourceTable = true)
+ }
+
+ test("alter table: unset properties (datasource table)") {
+ testUnsetProperties(isDatasourceTable = true)
+ }
+
+ test("alter table: set serde (datasource table)") {
+ testSetSerde(isDatasourceTable = true)
+ }
+
+ test("alter table: set serde partition (datasource table)") {
+ testSetSerdePartition(isDatasourceTable = true)
+ }
+
+ test("alter table: change column (datasource table)") {
+ testChangeColumn(isDatasourceTable = true)
+ }
+
+ test("alter table: add partition (datasource table)") {
+ testAddPartitions(isDatasourceTable = true)
+ }
+
+ test("alter table: drop partition (datasource table)") {
+ testDropPartitions(isDatasourceTable = true)
+ }
+
+ test("alter table: rename partition (datasource table)") {
+ testRenamePartitions(isDatasourceTable = true)
+ }
+
+ test("drop table - data source table") {
+ testDropTable(isDatasourceTable = true)
+ }
+
+ test("create a managed Hive source table") {
+ assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+ val tabName = "tbl"
+ withTable(tabName) {
+ val e = intercept[AnalysisException] {
+ sql(s"CREATE TABLE $tabName (i INT, j STRING)")
+ }.getMessage
+ assert(e.contains("Hive support is required to CREATE Hive TABLE"))
+ }
+ }
+
+ test("create an external Hive source table") {
+ assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+ withTempDir { tempDir =>
+ val tabName = "tbl"
+ withTable(tabName) {
+ val e = intercept[AnalysisException] {
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |LOCATION '${tempDir.toURI}'
+ """.stripMargin)
+ }.getMessage
+ assert(e.contains("Hive support is required to CREATE Hive TABLE"))
+ }
+ }
+ }
+
+ test("Create Hive Table As Select") {
+ import testImplicits._
+ withTable("t", "t1") {
+ var e = intercept[AnalysisException] {
+ sql("CREATE TABLE t SELECT 1 as a, 1 as b")
+ }.getMessage
+ assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
+
+ spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
+ e = intercept[AnalysisException] {
+ sql("CREATE TABLE t SELECT a, b from t1")
+ }.getMessage
+ assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
+ }
+ }
+
+}
+
+abstract class DDLSuite extends QueryTest with SQLTestUtils {
+
+ protected def isUsingHiveMetastore: Boolean = {
+ spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive"
+ }
+
+ protected def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable
+
+ private val escapedIdentifier = "`(.+)`".r
+
+ protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = table
+
+ private def normalizeSerdeProp(props: Map[String, String]): Map[String, String] = {
+ props.filterNot(p => Seq("serialization.format", "path").contains(p._1))
+ }
+
+ private def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
+ assert(normalizeCatalogTable(actual) == normalizeCatalogTable(expected))
+ }
+
/**
* Strip backticks, if any, from the string.
*/
@@ -75,33 +216,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
ignoreIfExists = false)
}
- private def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable = {
- val storage =
- CatalogStorageFormat(
- locationUri = Some(catalog.defaultTablePath(name)),
- inputFormat = None,
- outputFormat = None,
- serde = None,
- compressed = false,
- properties = Map())
- val metadata = new MetadataBuilder()
- .putString("key", "value")
- .build()
- CatalogTable(
- identifier = name,
- tableType = CatalogTableType.EXTERNAL,
- storage = storage,
- schema = new StructType()
- .add("col1", "int", nullable = true, metadata = metadata)
- .add("col2", "string")
- .add("a", "int")
- .add("b", "int"),
- provider = Some("parquet"),
- partitionColumnNames = Seq("a", "b"),
- createTime = 0L,
- tracksPartitionsInCatalog = true)
- }
-
private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
catalog.createTable(generateTable(catalog, name), ignoreIfExists = false)
}
@@ -115,6 +229,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
}
+ private def getDBPath(dbName: String): URI = {
+ val warehousePath = s"file:${spark.sessionState.conf.warehousePath.stripPrefix("file:")}"
+ new Path(warehousePath, s"$dbName.db").toUri
+ }
+
test("the qualified path of a database is stored in the catalog") {
val catalog = spark.sessionState.catalog
@@ -138,11 +257,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
try {
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbName)
- val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbName.db")
assert(db1 == CatalogDatabase(
dbName,
"",
- expectedLocation,
+ getDBPath(dbName),
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbName))
@@ -185,16 +303,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
- val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db")
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
- expectedLocation,
+ getDBPath(dbNameWithoutBackTicks),
Map.empty))
- intercept[DatabaseAlreadyExistsException] {
+ // TODO: HiveExternalCatalog should throw DatabaseAlreadyExistsException
+ val e = intercept[AnalysisException] {
sql(s"CREATE DATABASE $dbName")
- }
+ }.getMessage
+ assert(e.contains(s"already exists"))
} finally {
catalog.reset()
}
@@ -413,19 +532,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- test("desc table for parquet data source table using in-memory catalog") {
- assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
- val tabName = "tab1"
- withTable(tabName) {
- sql(s"CREATE TABLE $tabName(a int comment 'test') USING parquet ")
-
- checkAnswer(
- sql(s"DESC $tabName").select("col_name", "data_type", "comment"),
- Row("a", "int", "test")
- )
- }
- }
-
test("Alter/Describe Database") {
val catalog = spark.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
@@ -433,7 +539,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
- val location = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db")
+ val location = getDBPath(dbNameWithoutBackTicks)
sql(s"CREATE DATABASE $dbName")
@@ -477,7 +583,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
var message = intercept[AnalysisException] {
sql(s"DROP DATABASE $dbName")
}.getMessage
- assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
+ // TODO: Unify the exception.
+ if (isUsingHiveMetastore) {
+ assert(message.contains(s"NoSuchObjectException: $dbNameWithoutBackTicks"))
+ } else {
+ assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
+ }
message = intercept[AnalysisException] {
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
@@ -506,7 +617,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val message = intercept[AnalysisException] {
sql(s"DROP DATABASE $dbName RESTRICT")
}.getMessage
- assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
+ // TODO: Unify the exception.
+ if (isUsingHiveMetastore) {
+ assert(message.contains(s"Database $dbName is not empty. One or more tables exist"))
+ } else {
+ assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
+ }
catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false)
@@ -537,7 +653,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
createTable(catalog, tableIdent1)
val expectedTableIdent = tableIdent1.copy(database = Some("default"))
val expectedTable = generateTable(catalog, expectedTableIdent)
- assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+ checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
}
test("create table in a specific db") {
@@ -546,7 +662,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
createTable(catalog, tableIdent1)
val expectedTable = generateTable(catalog, tableIdent1)
- assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+ checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
}
test("create table using") {
@@ -731,52 +847,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testSetLocation(isDatasourceTable = false)
}
- test("alter table: set location (datasource table)") {
- testSetLocation(isDatasourceTable = true)
- }
-
test("alter table: set properties") {
testSetProperties(isDatasourceTable = false)
}
- test("alter table: set properties (datasource table)") {
- testSetProperties(isDatasourceTable = true)
- }
-
test("alter table: unset properties") {
testUnsetProperties(isDatasourceTable = false)
}
- test("alter table: unset properties (datasource table)") {
- testUnsetProperties(isDatasourceTable = true)
- }
-
// TODO: move this test to HiveDDLSuite.scala
ignore("alter table: set serde") {
testSetSerde(isDatasourceTable = false)
}
- test("alter table: set serde (datasource table)") {
- testSetSerde(isDatasourceTable = true)
- }
-
// TODO: move this test to HiveDDLSuite.scala
ignore("alter table: set serde partition") {
testSetSerdePartition(isDatasourceTable = false)
}
- test("alter table: set serde partition (datasource table)") {
- testSetSerdePartition(isDatasourceTable = true)
- }
-
test("alter table: change column") {
testChangeColumn(isDatasourceTable = false)
}
- test("alter table: change column (datasource table)") {
- testChangeColumn(isDatasourceTable = true)
- }
-
test("alter table: bucketing is not supported") {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -805,10 +897,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testAddPartitions(isDatasourceTable = false)
}
- test("alter table: add partition (datasource table)") {
- testAddPartitions(isDatasourceTable = true)
- }
-
test("alter table: recover partitions (sequential)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
testRecoverPartitions()
@@ -821,7 +909,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def testRecoverPartitions() {
+ protected def testRecoverPartitions() {
val catalog = spark.sessionState.catalog
// table to alter does not exist
intercept[AnalysisException] {
@@ -860,8 +948,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
- assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
- assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+ if (!isUsingHiveMetastore) {
+ assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
+ assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+ } else {
+ // After ALTER TABLE, the statistics of the first partition is removed by Hive megastore
+ assert(catalog.getPartition(tableIdent, part1).parameters.get("numFiles").isEmpty)
+ assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+ }
} finally {
fs.delete(root, true)
}
@@ -875,10 +969,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testDropPartitions(isDatasourceTable = false)
}
- test("alter table: drop partition (datasource table)") {
- testDropPartitions(isDatasourceTable = true)
- }
-
test("alter table: drop partition is not supported for views") {
assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')")
}
@@ -887,10 +977,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testRenamePartitions(isDatasourceTable = false)
}
- test("alter table: rename partition (datasource table)") {
- testRenamePartitions(isDatasourceTable = true)
- }
-
test("show table extended") {
withTempView("show1a", "show2b") {
sql(
@@ -971,11 +1057,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testDropTable(isDatasourceTable = false)
}
- test("drop table - data source table") {
- testDropTable(isDatasourceTable = true)
- }
-
- private def testDropTable(isDatasourceTable: Boolean): Unit = {
+ protected def testDropTable(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@@ -1011,9 +1093,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
tableIdent: TableIdentifier): Unit = {
catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
provider = Some("csv")))
+ assert(catalog.getTableMetadata(tableIdent).provider == Some("csv"))
}
- private def testSetProperties(isDatasourceTable: Boolean): Unit = {
+ protected def testSetProperties(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@@ -1022,7 +1105,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
def getProps: Map[String, String] = {
- catalog.getTableMetadata(tableIdent).properties
+ if (isUsingHiveMetastore) {
+ normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
+ } else {
+ catalog.getTableMetadata(tableIdent).properties
+ }
}
assert(getProps.isEmpty)
// set table properties
@@ -1038,7 +1125,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
+ protected def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@@ -1047,7 +1134,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
def getProps: Map[String, String] = {
- catalog.getTableMetadata(tableIdent).properties
+ if (isUsingHiveMetastore) {
+ normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
+ } else {
+ catalog.getTableMetadata(tableIdent).properties
+ }
}
// unset table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')")
@@ -1071,7 +1162,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(getProps == Map("x" -> "y"))
}
- private def testSetLocation(isDatasourceTable: Boolean): Unit = {
+ protected def testSetLocation(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val partSpec = Map("a" -> "1", "b" -> "2")
@@ -1082,24 +1173,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined)
- assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
+ assert(normalizeSerdeProp(catalog.getTableMetadata(tableIdent).storage.properties).isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined)
- assert(catalog.getPartition(tableIdent, partSpec).storage.properties.isEmpty)
+ assert(
+ normalizeSerdeProp(catalog.getPartition(tableIdent, partSpec).storage.properties).isEmpty)
+
// Verify that the location is set to the expected string
def verifyLocation(expected: URI, spec: Option[TablePartitionSpec] = None): Unit = {
val storageFormat = spec
.map { s => catalog.getPartition(tableIdent, s).storage }
.getOrElse { catalog.getTableMetadata(tableIdent).storage }
- if (isDatasourceTable) {
- if (spec.isDefined) {
- assert(storageFormat.properties.isEmpty)
- assert(storageFormat.locationUri === Some(expected))
- } else {
- assert(storageFormat.locationUri === Some(expected))
- }
- } else {
- assert(storageFormat.locationUri === Some(expected))
- }
+ // TODO(gatorsmile): fix the bug in alter table set location.
+ // if (isUsingHiveMetastore) {
+ // assert(storageFormat.properties.get("path") === expected)
+ // }
+ assert(storageFormat.locationUri === Some(expected))
}
// set table location
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
@@ -1124,7 +1212,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def testSetSerde(isDatasourceTable: Boolean): Unit = {
+ protected def testSetSerde(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@@ -1132,8 +1220,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
- assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
- assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
+ def checkSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
+ val serdeProp = catalog.getTableMetadata(tableIdent).storage.properties
+ if (isUsingHiveMetastore) {
+ assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
+ } else {
+ assert(serdeProp == expectedSerdeProps)
+ }
+ }
+ if (isUsingHiveMetastore) {
+ assert(catalog.getTableMetadata(tableIdent).storage.serde ==
+ Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+ } else {
+ assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
+ }
+ checkSerdeProps(Map.empty[String, String])
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
@@ -1146,31 +1247,30 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e1.getMessage.contains("datasource"))
assert(e2.getMessage.contains("datasource"))
} else {
- sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
- assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop"))
- assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
- sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
+ val newSerde = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+ sql(s"ALTER TABLE dbx.tab1 SET SERDE '$newSerde'")
+ assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(newSerde))
+ checkSerdeProps(Map.empty[String, String])
+ val serde2 = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"
+ sql(s"ALTER TABLE dbx.tab1 SET SERDE '$serde2' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
- assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop"))
- assert(catalog.getTableMetadata(tableIdent).storage.properties ==
- Map("k" -> "v", "kay" -> "vee"))
+ assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(serde2))
+ checkSerdeProps(Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
- assert(catalog.getTableMetadata(tableIdent).storage.properties ==
- Map("k" -> "vvv", "kay" -> "vee"))
+ checkSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
- assert(catalog.getTableMetadata(tableIdent).storage.properties ==
- Map("k" -> "vvv", "kay" -> "veee"))
+ checkSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
}
}
- private def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
+ protected def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val spec = Map("a" -> "1", "b" -> "2")
@@ -1183,8 +1283,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
- assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
- assert(catalog.getPartition(tableIdent, spec).storage.properties.isEmpty)
+ def checkPartitionSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
+ val serdeProp = catalog.getPartition(tableIdent, spec).storage.properties
+ if (isUsingHiveMetastore) {
+ assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
+ } else {
+ assert(serdeProp == expectedSerdeProps)
+ }
+ }
+ if (isUsingHiveMetastore) {
+ assert(catalog.getPartition(tableIdent, spec).storage.serde ==
+ Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+ } else {
+ assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
+ }
+ checkPartitionSerdeProps(Map.empty[String, String])
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
@@ -1199,26 +1312,23 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
} else {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'")
assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop"))
- assert(catalog.getPartition(tableIdent, spec).storage.properties.isEmpty)
+ checkPartitionSerdeProps(Map.empty[String, String])
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop"))
- assert(catalog.getPartition(tableIdent, spec).storage.properties ==
- Map("k" -> "v", "kay" -> "vee"))
+ checkPartitionSerdeProps(Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " +
"SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
- assert(catalog.getPartition(tableIdent, spec).storage.properties ==
- Map("k" -> "vvv", "kay" -> "vee"))
+ checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
}
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')")
- assert(catalog.getPartition(tableIdent, spec).storage.properties ==
- Map("k" -> "vvv", "kay" -> "veee"))
+ checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
}
// table to alter does not exist
intercept[AnalysisException] {
@@ -1226,7 +1336,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
+ protected def testAddPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "5")
@@ -1247,7 +1357,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
"PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
- assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(new URI("paris")))
+ val partitionLocation = if (isUsingHiveMetastore) {
+ val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
+ assert(tableLocation.isDefined)
+ makeQualifiedPath(new Path(tableLocation.get.toString, "paris"))
+ } else {
+ new URI("paris")
+ }
+
+ assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(partitionLocation))
assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)
// add partitions without explicitly specifying database
@@ -1277,7 +1395,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Set(part1, part2, part3, part4, part5))
}
- private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
+ protected def testDropPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "5")
@@ -1330,7 +1448,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.listPartitions(tableIdent).isEmpty)
}
- private def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
+ protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "q")
@@ -1374,7 +1492,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
}
- private def testChangeColumn(isDatasourceTable: Boolean): Unit = {
+ protected def testChangeColumn(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val resolver = spark.sessionState.conf.resolver
val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -1474,35 +1592,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
)
}
- test("create a managed Hive source table") {
- assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
- val tabName = "tbl"
- withTable(tabName) {
- val e = intercept[AnalysisException] {
- sql(s"CREATE TABLE $tabName (i INT, j STRING)")
- }.getMessage
- assert(e.contains("Hive support is required to CREATE Hive TABLE"))
- }
- }
-
- test("create an external Hive source table") {
- assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
- withTempDir { tempDir =>
- val tabName = "tbl"
- withTable(tabName) {
- val e = intercept[AnalysisException] {
- sql(
- s"""
- |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
- |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
- |LOCATION '${tempDir.toURI}'
- """.stripMargin)
- }.getMessage
- assert(e.contains("Hive support is required to CREATE Hive TABLE"))
- }
- }
- }
-
test("create a data source table without schema") {
import testImplicits._
withTempPath { tempDir =>
@@ -1541,22 +1630,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- test("Create Hive Table As Select") {
- import testImplicits._
- withTable("t", "t1") {
- var e = intercept[AnalysisException] {
- sql("CREATE TABLE t SELECT 1 as a, 1 as b")
- }.getMessage
- assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
-
- spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
- e = intercept[AnalysisException] {
- sql("CREATE TABLE t SELECT a, b from t1")
- }.getMessage
- assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
- }
- }
-
test("Create Data Source Table As Select") {
import testImplicits._
withTable("t", "t1", "t2") {
@@ -1580,7 +1653,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("drop default database") {
- Seq("true", "false").foreach { caseSensitive =>
+ val caseSensitiveOptions = if (isUsingHiveMetastore) Seq("false") else Seq("true", "false")
+ caseSensitiveOptions.foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
var message = intercept[AnalysisException] {
sql("DROP DATABASE default")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 9201954b66..12fc8993d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -306,6 +306,11 @@ private[sql] trait SQLTestUtils
val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(hadoopPath).toUri
}
+
+ def makeQualifiedPath(path: Path): URI = {
+ val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
+ fs.makeQualified(path).toUri
+ }
}
private[sql] object SQLTestUtils {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 10d929a4a0..fce055048d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -27,16 +27,88 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+
+// TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
+class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeAndAfterEach {
+ override def afterEach(): Unit = {
+ try {
+ // drop all databases, tables and functions after each test
+ spark.sessionState.catalog.reset()
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ protected override def generateTable(
+ catalog: SessionCatalog,
+ name: TableIdentifier): CatalogTable = {
+ val storage =
+ CatalogStorageFormat(
+ locationUri = Some(catalog.defaultTablePath(name)),
+ inputFormat = Some("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+ outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"),
+ serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+ compressed = false,
+ properties = Map("serialization.format" -> "1"))
+ val metadata = new MetadataBuilder()
+ .putString("key", "value")
+ .build()
+ CatalogTable(
+ identifier = name,
+ tableType = CatalogTableType.EXTERNAL,
+ storage = storage,
+ schema = new StructType()
+ .add("col1", "int", nullable = true, metadata = metadata)
+ .add("col2", "string")
+ .add("a", "int")
+ .add("b", "int"),
+ provider = Some("hive"),
+ partitionColumnNames = Seq("a", "b"),
+ createTime = 0L,
+ tracksPartitionsInCatalog = true)
+ }
+
+ protected override def normalizeCatalogTable(table: CatalogTable): CatalogTable = {
+ val nondeterministicProps = Set(
+ "CreateTime",
+ "transient_lastDdlTime",
+ "grantTime",
+ "lastUpdateTime",
+ "last_modified_by",
+ "last_modified_time",
+ "Owner:",
+ "COLUMN_STATS_ACCURATE",
+ // The following are hive specific schema parameters which we do not need to match exactly.
+ "numFiles",
+ "numRows",
+ "rawDataSize",
+ "totalSize",
+ "totalNumberFiles",
+ "maxFileSize",
+ "minFileSize"
+ )
+
+ table.copy(
+ createTime = 0L,
+ lastAccessTime = 0L,
+ owner = "",
+ properties = table.properties.filterKeys(!nondeterministicProps.contains(_)),
+ // View texts are checked separately
+ viewText = None
+ )
+ }
+
+}
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
@@ -1720,61 +1792,6 @@ class HiveDDLSuite
}
Seq("a b", "a:b", "a%b").foreach { specialChars =>
- test(s"datasource table: location uri contains $specialChars") {
- withTable("t", "t1") {
- withTempDir { dir =>
- val loc = new File(dir, specialChars)
- loc.mkdir()
- spark.sql(
- s"""
- |CREATE TABLE t(a string)
- |USING parquet
- |LOCATION '$loc'
- """.stripMargin)
-
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new Path(loc.getAbsolutePath).toUri)
- assert(new Path(table.location).toString.contains(specialChars))
-
- assert(loc.listFiles().isEmpty)
- spark.sql("INSERT INTO TABLE t SELECT 1")
- assert(loc.listFiles().length >= 1)
- checkAnswer(spark.table("t"), Row("1") :: Nil)
- }
-
- withTempDir { dir =>
- val loc = new File(dir, specialChars)
- loc.mkdir()
- spark.sql(
- s"""
- |CREATE TABLE t1(a string, b string)
- |USING parquet
- |PARTITIONED BY(b)
- |LOCATION '$loc'
- """.stripMargin)
-
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == new Path(loc.getAbsolutePath).toUri)
- assert(new Path(table.location).toString.contains(specialChars))
-
- assert(loc.listFiles().isEmpty)
- spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
- val partFile = new File(loc, "b=2")
- assert(partFile.listFiles().length >= 1)
- checkAnswer(spark.table("t1"), Row("1", "2") :: Nil)
-
- spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
- val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14")
- assert(!partFile1.exists())
- val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14")
- assert(partFile2.listFiles().length >= 1)
- checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil)
- }
- }
- }
- }
-
- Seq("a b", "a:b", "a%b").foreach { specialChars =>
test(s"hive table: location uri contains $specialChars") {
withTable("t") {
withTempDir { dir =>
@@ -1848,28 +1865,4 @@ class HiveDDLSuite
}
}
}
-
- Seq("a b", "a:b", "a%b").foreach { specialChars =>
- test(s"location uri contains $specialChars for database") {
- try {
- withTable("t") {
- withTempDir { dir =>
- val loc = new File(dir, specialChars)
- spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'")
- spark.sql("USE tmpdb")
-
- Seq(1).toDF("a").write.saveAsTable("t")
- val tblloc = new File(loc, "t")
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- val tblPath = new Path(tblloc.getAbsolutePath)
- val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
- assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath))
- assert(tblloc.listFiles().nonEmpty)
- }
- }
- } finally {
- spark.sql("DROP DATABASE IF EXISTS tmpdb")
- }
- }
- }
}