aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org/apache
diff options
context:
space:
mode:
authorBudde <budde@amazon.com>2017-03-09 12:55:33 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-09 12:55:33 -0800
commitf79371ad86d94da14bd1ddb53e99a388017b6892 (patch)
tree20060cf7dba8c1cbda2536b0674bf9a93103bf93 /sql/hive/src/test/scala/org/apache
parentcabe1df8606e7e5b9e6efb106045deb3f39f5f13 (diff)
downloadspark-f79371ad86d94da14bd1ddb53e99a388017b6892.tar.gz
spark-f79371ad86d94da14bd1ddb53e99a388017b6892.tar.bz2
spark-f79371ad86d94da14bd1ddb53e99a388017b6892.zip
[SPARK-19611][SQL] Introduce configurable table schema inference
## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <budde@amazon.com> Closes #16944 from budde/SPARK-19611.
Diffstat (limited to 'sql/hive/src/test/scala/org/apache')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala305
1 files changed, 305 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
new file mode 100644
index 0000000000..7895580381
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode.{Value => InferenceMode, _}
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+class HiveSchemaInferenceSuite
+ extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
+
+ import HiveSchemaInferenceSuite._
+ import HiveExternalCatalog.DATASOURCE_SCHEMA_PREFIX
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ FileStatusCache.resetForTesting()
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+ spark.sessionState.catalog.tableRelationCache.invalidateAll()
+ FileStatusCache.resetForTesting()
+ }
+
+ private val externalCatalog = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+ private val client = externalCatalog.client
+
+ // Return a copy of the given schema with all field names converted to lower case.
+ private def lowerCaseSchema(schema: StructType): StructType = {
+ StructType(schema.map(f => f.copy(name = f.name.toLowerCase)))
+ }
+
+ // Create a Hive external test table containing the given field and partition column names.
+ // Returns a case-sensitive schema for the table.
+ private def setupExternalTable(
+ fileType: String,
+ fields: Seq[String],
+ partitionCols: Seq[String],
+ dir: File): StructType = {
+ // Treat all table fields as bigints...
+ val structFields = fields.map { field =>
+ StructField(
+ name = field,
+ dataType = LongType,
+ nullable = true,
+ metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "bigint").build())
+ }
+ // and all partition columns as ints
+ val partitionStructFields = partitionCols.map { field =>
+ StructField(
+ // Partition column case isn't preserved
+ name = field.toLowerCase,
+ dataType = IntegerType,
+ nullable = true,
+ metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "int").build())
+ }
+ val schema = StructType(structFields ++ partitionStructFields)
+
+ // Write some test data (partitioned if specified)
+ val writer = spark.range(NUM_RECORDS)
+ .selectExpr((fields ++ partitionCols).map("id as " + _): _*)
+ .write
+ .partitionBy(partitionCols: _*)
+ .mode("overwrite")
+ fileType match {
+ case ORC_FILE_TYPE =>
+ writer.orc(dir.getAbsolutePath)
+ case PARQUET_FILE_TYPE =>
+ writer.parquet(dir.getAbsolutePath)
+ }
+
+ // Create Hive external table with lowercased schema
+ val serde = HiveSerDe.serdeMap(fileType)
+ client.createTable(
+ CatalogTable(
+ identifier = TableIdentifier(table = TEST_TABLE_NAME, database = Option(DATABASE)),
+ tableType = CatalogTableType.EXTERNAL,
+ storage = CatalogStorageFormat(
+ locationUri = Option(new java.net.URI(dir.getAbsolutePath)),
+ inputFormat = serde.inputFormat,
+ outputFormat = serde.outputFormat,
+ serde = serde.serde,
+ compressed = false,
+ properties = Map("serialization.format" -> "1")),
+ schema = schema,
+ provider = Option("hive"),
+ partitionColumnNames = partitionCols.map(_.toLowerCase),
+ properties = Map.empty),
+ true)
+
+ // Add partition records (if specified)
+ if (!partitionCols.isEmpty) {
+ spark.catalog.recoverPartitions(TEST_TABLE_NAME)
+ }
+
+ // Check that the table returned by HiveExternalCatalog has schemaPreservesCase set to false
+ // and that the raw table returned by the Hive client doesn't have any Spark SQL properties
+ // set (table needs to be obtained from client since HiveExternalCatalog filters these
+ // properties out).
+ assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase)
+ val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
+ assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)) == Map.empty)
+ schema
+ }
+
+ private def withTestTables(
+ fileType: String)(f: (Seq[String], Seq[String], StructType) => Unit): Unit = {
+ // Test both a partitioned and unpartitioned Hive table
+ val tableFields = Seq(
+ (Seq("fieldOne"), Seq("partCol1", "partCol2")),
+ (Seq("fieldOne", "fieldTwo"), Seq.empty[String]))
+
+ tableFields.foreach { case (fields, partCols) =>
+ withTempDir { dir =>
+ val schema = setupExternalTable(fileType, fields, partCols, dir)
+ withTable(TEST_TABLE_NAME) { f(fields, partCols, schema) }
+ }
+ }
+ }
+
+ private def withFileTypes(f: (String) => Unit): Unit
+ = Seq(ORC_FILE_TYPE, PARQUET_FILE_TYPE).foreach(f)
+
+ private def withInferenceMode(mode: InferenceMode)(f: => Unit): Unit = {
+ withSQLConf(
+ HiveUtils.CONVERT_METASTORE_ORC.key -> "true",
+ SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key -> mode.toString)(f)
+ }
+
+ private val inferenceKey = SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key
+
+ private def testFieldQuery(fields: Seq[String]): Unit = {
+ if (!fields.isEmpty) {
+ val query = s"SELECT * FROM ${TEST_TABLE_NAME} WHERE ${Random.shuffle(fields).head} >= 0"
+ assert(spark.sql(query).count == NUM_RECORDS)
+ }
+ }
+
+ private def testTableSchema(expectedSchema: StructType): Unit
+ = assert(spark.table(TEST_TABLE_NAME).schema == expectedSchema)
+
+ withFileTypes { fileType =>
+ test(s"$fileType: schema should be inferred and saved when INFER_AND_SAVE is specified") {
+ withInferenceMode(INFER_AND_SAVE) {
+ withTestTables(fileType) { (fields, partCols, schema) =>
+ testFieldQuery(fields)
+ testFieldQuery(partCols)
+ testTableSchema(schema)
+
+ // Verify the catalog table now contains the updated schema and properties
+ val catalogTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
+ assert(catalogTable.schemaPreservesCase)
+ assert(catalogTable.schema == schema)
+ assert(catalogTable.partitionColumnNames == partCols.map(_.toLowerCase))
+ }
+ }
+ }
+ }
+
+ withFileTypes { fileType =>
+ test(s"$fileType: schema should be inferred but not stored when INFER_ONLY is specified") {
+ withInferenceMode(INFER_ONLY) {
+ withTestTables(fileType) { (fields, partCols, schema) =>
+ val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
+ testFieldQuery(fields)
+ testFieldQuery(partCols)
+ testTableSchema(schema)
+ // Catalog table shouldn't be altered
+ assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable)
+ }
+ }
+ }
+ }
+
+ withFileTypes { fileType =>
+ test(s"$fileType: schema should not be inferred when NEVER_INFER is specified") {
+ withInferenceMode(NEVER_INFER) {
+ withTestTables(fileType) { (fields, partCols, schema) =>
+ val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
+ // Only check the table schema as the test queries will break
+ testTableSchema(lowerCaseSchema(schema))
+ assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable)
+ }
+ }
+ }
+ }
+
+ test("mergeWithMetastoreSchema() should return expected results") {
+ // Field type conflict resolution
+ assertResult(
+ StructType(Seq(
+ StructField("lowerCase", StringType),
+ StructField("UPPERCase", DoubleType, nullable = false)))) {
+
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("lowercase", StringType),
+ StructField("uppercase", DoubleType, nullable = false))),
+
+ StructType(Seq(
+ StructField("lowerCase", BinaryType),
+ StructField("UPPERCase", IntegerType, nullable = true))))
+ }
+
+ // MetaStore schema is subset of parquet schema
+ assertResult(
+ StructType(Seq(
+ StructField("UPPERCase", DoubleType, nullable = false)))) {
+
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("uppercase", DoubleType, nullable = false))),
+
+ StructType(Seq(
+ StructField("lowerCase", BinaryType),
+ StructField("UPPERCase", IntegerType, nullable = true))))
+ }
+
+ // Metastore schema contains additional non-nullable fields.
+ assert(intercept[Throwable] {
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("uppercase", DoubleType, nullable = false),
+ StructField("lowerCase", BinaryType, nullable = false))),
+
+ StructType(Seq(
+ StructField("UPPERCase", IntegerType, nullable = true))))
+ }.getMessage.contains("Detected conflicting schemas"))
+
+ // Conflicting non-nullable field names
+ intercept[Throwable] {
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(StructField("lower", StringType, nullable = false))),
+ StructType(Seq(StructField("lowerCase", BinaryType))))
+ }
+
+ // Check that merging missing nullable fields works as expected.
+ assertResult(
+ StructType(Seq(
+ StructField("firstField", StringType, nullable = true),
+ StructField("secondField", StringType, nullable = true),
+ StructField("thirdfield", StringType, nullable = true)))) {
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("firstfield", StringType, nullable = true),
+ StructField("secondfield", StringType, nullable = true),
+ StructField("thirdfield", StringType, nullable = true))),
+ StructType(Seq(
+ StructField("firstField", StringType, nullable = true),
+ StructField("secondField", StringType, nullable = true))))
+ }
+
+ // Merge should fail if the Metastore contains any additional fields that are not
+ // nullable.
+ assert(intercept[Throwable] {
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("firstfield", StringType, nullable = true),
+ StructField("secondfield", StringType, nullable = true),
+ StructField("thirdfield", StringType, nullable = false))),
+ StructType(Seq(
+ StructField("firstField", StringType, nullable = true),
+ StructField("secondField", StringType, nullable = true))))
+ }.getMessage.contains("Detected conflicting schemas"))
+ }
+}
+
+object HiveSchemaInferenceSuite {
+ private val NUM_RECORDS = 10
+ private val DATABASE = "default"
+ private val TEST_TABLE_NAME = "test_table"
+ private val ORC_FILE_TYPE = "orc"
+ private val PARQUET_FILE_TYPE = "parquet"
+}