From f79371ad86d94da14bd1ddb53e99a388017b6892 Mon Sep 17 00:00:00 2001 From: Budde Date: Thu, 9 Mar 2017 12:55:33 -0800 Subject: [SPARK-19611][SQL] Introduce configurable table schema inference ## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde Closes #16944 from budde/SPARK-19611. --- .../spark/sql/hive/HiveSchemaInferenceSuite.scala | 305 +++++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala (limited to 'sql/hive/src/test/scala/org/apache') diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala new file mode 100644 index 0000000000..7895580381 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.util.concurrent.{Executors, TimeUnit} + +import scala.util.Random + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode.{Value => InferenceMode, _} +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +class HiveSchemaInferenceSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + import HiveSchemaInferenceSuite._ + import HiveExternalCatalog.DATASOURCE_SCHEMA_PREFIX + + override def beforeEach(): Unit = { + super.beforeEach() + FileStatusCache.resetForTesting() + } + + override def afterEach(): Unit = { + super.afterEach() + spark.sessionState.catalog.tableRelationCache.invalidateAll() + FileStatusCache.resetForTesting() + } + + private val externalCatalog = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + private val client = externalCatalog.client + + // Return a copy of the given schema with all field names converted to lower case. + private def lowerCaseSchema(schema: StructType): StructType = { + StructType(schema.map(f => f.copy(name = f.name.toLowerCase))) + } + + // Create a Hive external test table containing the given field and partition column names. + // Returns a case-sensitive schema for the table. + private def setupExternalTable( + fileType: String, + fields: Seq[String], + partitionCols: Seq[String], + dir: File): StructType = { + // Treat all table fields as bigints... + val structFields = fields.map { field => + StructField( + name = field, + dataType = LongType, + nullable = true, + metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "bigint").build()) + } + // and all partition columns as ints + val partitionStructFields = partitionCols.map { field => + StructField( + // Partition column case isn't preserved + name = field.toLowerCase, + dataType = IntegerType, + nullable = true, + metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "int").build()) + } + val schema = StructType(structFields ++ partitionStructFields) + + // Write some test data (partitioned if specified) + val writer = spark.range(NUM_RECORDS) + .selectExpr((fields ++ partitionCols).map("id as " + _): _*) + .write + .partitionBy(partitionCols: _*) + .mode("overwrite") + fileType match { + case ORC_FILE_TYPE => + writer.orc(dir.getAbsolutePath) + case PARQUET_FILE_TYPE => + writer.parquet(dir.getAbsolutePath) + } + + // Create Hive external table with lowercased schema + val serde = HiveSerDe.serdeMap(fileType) + client.createTable( + CatalogTable( + identifier = TableIdentifier(table = TEST_TABLE_NAME, database = Option(DATABASE)), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + locationUri = Option(new java.net.URI(dir.getAbsolutePath)), + inputFormat = serde.inputFormat, + outputFormat = serde.outputFormat, + serde = serde.serde, + compressed = false, + properties = Map("serialization.format" -> "1")), + schema = schema, + provider = Option("hive"), + partitionColumnNames = partitionCols.map(_.toLowerCase), + properties = Map.empty), + true) + + // Add partition records (if specified) + if (!partitionCols.isEmpty) { + spark.catalog.recoverPartitions(TEST_TABLE_NAME) + } + + // Check that the table returned by HiveExternalCatalog has schemaPreservesCase set to false + // and that the raw table returned by the Hive client doesn't have any Spark SQL properties + // set (table needs to be obtained from client since HiveExternalCatalog filters these + // properties out). + assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase) + val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME) + assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)) == Map.empty) + schema + } + + private def withTestTables( + fileType: String)(f: (Seq[String], Seq[String], StructType) => Unit): Unit = { + // Test both a partitioned and unpartitioned Hive table + val tableFields = Seq( + (Seq("fieldOne"), Seq("partCol1", "partCol2")), + (Seq("fieldOne", "fieldTwo"), Seq.empty[String])) + + tableFields.foreach { case (fields, partCols) => + withTempDir { dir => + val schema = setupExternalTable(fileType, fields, partCols, dir) + withTable(TEST_TABLE_NAME) { f(fields, partCols, schema) } + } + } + } + + private def withFileTypes(f: (String) => Unit): Unit + = Seq(ORC_FILE_TYPE, PARQUET_FILE_TYPE).foreach(f) + + private def withInferenceMode(mode: InferenceMode)(f: => Unit): Unit = { + withSQLConf( + HiveUtils.CONVERT_METASTORE_ORC.key -> "true", + SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key -> mode.toString)(f) + } + + private val inferenceKey = SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key + + private def testFieldQuery(fields: Seq[String]): Unit = { + if (!fields.isEmpty) { + val query = s"SELECT * FROM ${TEST_TABLE_NAME} WHERE ${Random.shuffle(fields).head} >= 0" + assert(spark.sql(query).count == NUM_RECORDS) + } + } + + private def testTableSchema(expectedSchema: StructType): Unit + = assert(spark.table(TEST_TABLE_NAME).schema == expectedSchema) + + withFileTypes { fileType => + test(s"$fileType: schema should be inferred and saved when INFER_AND_SAVE is specified") { + withInferenceMode(INFER_AND_SAVE) { + withTestTables(fileType) { (fields, partCols, schema) => + testFieldQuery(fields) + testFieldQuery(partCols) + testTableSchema(schema) + + // Verify the catalog table now contains the updated schema and properties + val catalogTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) + assert(catalogTable.schemaPreservesCase) + assert(catalogTable.schema == schema) + assert(catalogTable.partitionColumnNames == partCols.map(_.toLowerCase)) + } + } + } + } + + withFileTypes { fileType => + test(s"$fileType: schema should be inferred but not stored when INFER_ONLY is specified") { + withInferenceMode(INFER_ONLY) { + withTestTables(fileType) { (fields, partCols, schema) => + val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) + testFieldQuery(fields) + testFieldQuery(partCols) + testTableSchema(schema) + // Catalog table shouldn't be altered + assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable) + } + } + } + } + + withFileTypes { fileType => + test(s"$fileType: schema should not be inferred when NEVER_INFER is specified") { + withInferenceMode(NEVER_INFER) { + withTestTables(fileType) { (fields, partCols, schema) => + val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) + // Only check the table schema as the test queries will break + testTableSchema(lowerCaseSchema(schema)) + assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable) + } + } + } + } + + test("mergeWithMetastoreSchema() should return expected results") { + // Field type conflict resolution + assertResult( + StructType(Seq( + StructField("lowerCase", StringType), + StructField("UPPERCase", DoubleType, nullable = false)))) { + + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("lowercase", StringType), + StructField("uppercase", DoubleType, nullable = false))), + + StructType(Seq( + StructField("lowerCase", BinaryType), + StructField("UPPERCase", IntegerType, nullable = true)))) + } + + // MetaStore schema is subset of parquet schema + assertResult( + StructType(Seq( + StructField("UPPERCase", DoubleType, nullable = false)))) { + + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("uppercase", DoubleType, nullable = false))), + + StructType(Seq( + StructField("lowerCase", BinaryType), + StructField("UPPERCase", IntegerType, nullable = true)))) + } + + // Metastore schema contains additional non-nullable fields. + assert(intercept[Throwable] { + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("uppercase", DoubleType, nullable = false), + StructField("lowerCase", BinaryType, nullable = false))), + + StructType(Seq( + StructField("UPPERCase", IntegerType, nullable = true)))) + }.getMessage.contains("Detected conflicting schemas")) + + // Conflicting non-nullable field names + intercept[Throwable] { + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq(StructField("lower", StringType, nullable = false))), + StructType(Seq(StructField("lowerCase", BinaryType)))) + } + + // Check that merging missing nullable fields works as expected. + assertResult( + StructType(Seq( + StructField("firstField", StringType, nullable = true), + StructField("secondField", StringType, nullable = true), + StructField("thirdfield", StringType, nullable = true)))) { + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("firstfield", StringType, nullable = true), + StructField("secondfield", StringType, nullable = true), + StructField("thirdfield", StringType, nullable = true))), + StructType(Seq( + StructField("firstField", StringType, nullable = true), + StructField("secondField", StringType, nullable = true)))) + } + + // Merge should fail if the Metastore contains any additional fields that are not + // nullable. + assert(intercept[Throwable] { + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("firstfield", StringType, nullable = true), + StructField("secondfield", StringType, nullable = true), + StructField("thirdfield", StringType, nullable = false))), + StructType(Seq( + StructField("firstField", StringType, nullable = true), + StructField("secondField", StringType, nullable = true)))) + }.getMessage.contains("Detected conflicting schemas")) + } +} + +object HiveSchemaInferenceSuite { + private val NUM_RECORDS = 10 + private val DATABASE = "default" + private val TEST_TABLE_NAME = "test_table" + private val ORC_FILE_TYPE = "orc" + private val PARQUET_FILE_TYPE = "parquet" +} -- cgit v1.2.3