From f79371ad86d94da14bd1ddb53e99a388017b6892 Mon Sep 17 00:00:00 2001 From: Budde Date: Thu, 9 Mar 2017 12:55:33 -0800 Subject: [SPARK-19611][SQL] Introduce configurable table schema inference ## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde Closes #16944 from budde/SPARK-19611. --- .../sql/catalyst/catalog/ExternalCatalog.scala | 15 +- .../sql/catalyst/catalog/InMemoryCatalog.scala | 10 + .../spark/sql/catalyst/catalog/interface.scala | 8 +- .../catalyst/catalog/ExternalCatalogSuite.scala | 15 +- .../spark/sql/catalyst/trees/TreeNodeSuite.scala | 3 +- .../datasources/parquet/ParquetFileFormat.scala | 65 ----- .../org/apache/spark/sql/internal/SQLConf.scala | 22 ++ .../datasources/parquet/ParquetSchemaSuite.scala | 82 ------ .../spark/sql/hive/HiveExternalCatalog.scala | 23 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 100 ++++++- .../spark/sql/hive/HiveSchemaInferenceSuite.scala | 305 +++++++++++++++++++++ 11 files changed, 489 insertions(+), 159 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 31eded4deb..08a01e8601 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Expression - +import org.apache.spark.sql.types.StructType /** * Interface for the system catalog (of functions, partitions, tables, and databases). @@ -104,6 +104,19 @@ abstract class ExternalCatalog { */ def alterTable(tableDefinition: CatalogTable): Unit + /** + * Alter the schema of a table identified by the provided database and table name. The new schema + * should still contain the existing bucket columns and partition columns used by the table. This + * method will also update any Spark SQL-related parameters stored as Hive table properties (such + * as the schema itself). + * + * @param db Database that table to alter schema for exists in + * @param table Name of table to alter schema for + * @param schema Updated schema to be used for the table (must contain existing partition and + * bucket columns) + */ + def alterTableSchema(db: String, table: String, schema: StructType): Unit + def getTable(db: String, table: String): CatalogTable def getTableOption(db: String, table: String): Option[CatalogTable] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 80aba4af94..5cc6b0abc6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.types.StructType /** * An in-memory (ephemeral) implementation of the system catalog. @@ -297,6 +298,15 @@ class InMemoryCatalog( catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition } + override def alterTableSchema( + db: String, + table: String, + schema: StructType): Unit = synchronized { + requireTableExists(db, table) + val origTable = catalog(db).tables(table).table + catalog(db).tables(table).table = origTable.copy(schema = schema) + } + override def getTable(db: String, table: String): CatalogTable = synchronized { requireTableExists(db, table) catalog(db).tables(table).table diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 4452c47987..e3631b0c07 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -163,6 +163,11 @@ case class BucketSpec( * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the * catalog. If false, it is inferred automatically based on file * structure. + * @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive. + * When using a Hive Metastore, this flag is set to false if a case- + * sensitive schema was unable to be read from the table properties. + * Used to trigger case-sensitive schema inference at query time, when + * configured. */ case class CatalogTable( identifier: TableIdentifier, @@ -180,7 +185,8 @@ case class CatalogTable( viewText: Option[String] = None, comment: Option[String] = None, unsupportedFeatures: Seq[String] = Seq.empty, - tracksPartitionsInCatalog: Boolean = false) { + tracksPartitionsInCatalog: Boolean = false, + schemaPreservesCase: Boolean = true) { import CatalogTable._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 07ccd68698..7820f39d96 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -240,6 +240,19 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } + test("alter table schema") { + val catalog = newBasicCatalog() + val tbl1 = catalog.getTable("db2", "tbl1") + val newSchema = StructType(Seq( + StructField("new_field_1", IntegerType), + StructField("new_field_2", StringType), + StructField("a", IntegerType), + StructField("b", StringType))) + catalog.alterTableSchema("db2", "tbl1", newSchema) + val newTbl1 = catalog.getTable("db2", "tbl1") + assert(newTbl1.schema == newSchema) + } + test("get table") { assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index af1eaa1f23..37e3dfabd0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -491,7 +491,8 @@ class TreeNodeSuite extends SparkFunSuite { "lastAccessTime" -> -1, "tracksPartitionsInCatalog" -> false, "properties" -> JNull, - "unsupportedFeatures" -> List.empty[String])) + "unsupportedFeatures" -> List.empty[String], + "schemaPreservesCase" -> JBool(true))) // For unknown case class, returns JNull. val bigValue = new Array[Int](10000) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 828949eddc..5313c2f374 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -475,71 +475,6 @@ object ParquetFileFormat extends Logging { } } - /** - * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore - * schema and Parquet schema. - * - * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the - * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't - * distinguish binary and string). This method generates a correct schema by merging Metastore - * schema data types and Parquet schema field names. - */ - def mergeMetastoreParquetSchema( - metastoreSchema: StructType, - parquetSchema: StructType): StructType = { - def schemaConflictMessage: String = - s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema: - |${metastoreSchema.prettyJson} - | - |Parquet schema: - |${parquetSchema.prettyJson} - """.stripMargin - - val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema) - - assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage) - - val ordinalMap = metastoreSchema.zipWithIndex.map { - case (field, index) => field.name.toLowerCase -> index - }.toMap - - val reorderedParquetSchema = mergedParquetSchema.sortBy(f => - ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1)) - - StructType(metastoreSchema.zip(reorderedParquetSchema).map { - // Uses Parquet field names but retains Metastore data types. - case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase => - mSchema.copy(name = pSchema.name) - case _ => - throw new SparkException(schemaConflictMessage) - }) - } - - /** - * Returns the original schema from the Parquet file with any missing nullable fields from the - * Hive Metastore schema merged in. - * - * When constructing a DataFrame from a collection of structured data, the resulting object has - * a schema corresponding to the union of the fields present in each element of the collection. - * Spark SQL simply assigns a null value to any field that isn't present for a particular row. - * In some cases, it is possible that a given table partition stored as a Parquet file doesn't - * contain a particular nullable field in its schema despite that field being present in the - * table schema obtained from the Hive Metastore. This method returns a schema representing the - * Parquet file schema along with any additional nullable fields from the Metastore schema - * merged in. - */ - private[parquet] def mergeMissingNullableFields( - metastoreSchema: StructType, - parquetSchema: StructType): StructType = { - val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap - val missingFields = metastoreSchema - .map(_.name.toLowerCase) - .diff(parquetSchema.map(_.name.toLowerCase)) - .map(fieldMap(_)) - .filter(_.nullable) - StructType(parquetSchema ++ missingFields) - } - /** * Reads Parquet footers in multi-threaded manner. * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1244f690fd..8e3f567b7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -296,6 +296,25 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) + object HiveCaseSensitiveInferenceMode extends Enumeration { + val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value + } + + val HIVE_CASE_SENSITIVE_INFERENCE = buildConf("spark.sql.hive.caseSensitiveInferenceMode") + .doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " + + "table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " + + "formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " + + "any table backed by files containing case-sensitive field names or queries may not return " + + "accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " + + "case-sensitive schema from the underlying data files and write it back to the table " + + "properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " + + "properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " + + "instead of inferring).") + .stringConf + .transform(_.toUpperCase()) + .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString)) + .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString) + val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly") .doc("When true, enable the metadata-only query optimization that use the table's metadata " + "to produce the partition columns instead of table scans. It applies when all the columns " + @@ -792,6 +811,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE) + def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value = + HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE)) + def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT) def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 8a980a7eb5..6aa940afbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -368,88 +368,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } - test("merge with metastore schema") { - // Field type conflict resolution - assertResult( - StructType(Seq( - StructField("lowerCase", StringType), - StructField("UPPERCase", DoubleType, nullable = false)))) { - - ParquetFileFormat.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("lowercase", StringType), - StructField("uppercase", DoubleType, nullable = false))), - - StructType(Seq( - StructField("lowerCase", BinaryType), - StructField("UPPERCase", IntegerType, nullable = true)))) - } - - // MetaStore schema is subset of parquet schema - assertResult( - StructType(Seq( - StructField("UPPERCase", DoubleType, nullable = false)))) { - - ParquetFileFormat.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("uppercase", DoubleType, nullable = false))), - - StructType(Seq( - StructField("lowerCase", BinaryType), - StructField("UPPERCase", IntegerType, nullable = true)))) - } - - // Metastore schema contains additional non-nullable fields. - assert(intercept[Throwable] { - ParquetFileFormat.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("uppercase", DoubleType, nullable = false), - StructField("lowerCase", BinaryType, nullable = false))), - - StructType(Seq( - StructField("UPPERCase", IntegerType, nullable = true)))) - }.getMessage.contains("detected conflicting schemas")) - - // Conflicting non-nullable field names - intercept[Throwable] { - ParquetFileFormat.mergeMetastoreParquetSchema( - StructType(Seq(StructField("lower", StringType, nullable = false))), - StructType(Seq(StructField("lowerCase", BinaryType)))) - } - } - - test("merge missing nullable fields from Metastore schema") { - // Standard case: Metastore schema contains additional nullable fields not present - // in the Parquet file schema. - assertResult( - StructType(Seq( - StructField("firstField", StringType, nullable = true), - StructField("secondField", StringType, nullable = true), - StructField("thirdfield", StringType, nullable = true)))) { - ParquetFileFormat.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("firstfield", StringType, nullable = true), - StructField("secondfield", StringType, nullable = true), - StructField("thirdfield", StringType, nullable = true))), - StructType(Seq( - StructField("firstField", StringType, nullable = true), - StructField("secondField", StringType, nullable = true)))) - } - - // Merge should fail if the Metastore contains any additional fields that are not - // nullable. - assert(intercept[Throwable] { - ParquetFileFormat.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("firstfield", StringType, nullable = true), - StructField("secondfield", StringType, nullable = true), - StructField("thirdfield", StringType, nullable = false))), - StructType(Seq( - StructField("firstField", StringType, nullable = true), - StructField("secondField", StringType, nullable = true)))) - }.getMessage.contains("detected conflicting schemas")) - } - test("schema merging failure error message") { import testImplicits._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 9ab4624594..78aa2bd249 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -597,6 +597,25 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient { + requireTableExists(db, table) + val rawTable = getRawTable(db, table) + val withNewSchema = rawTable.copy(schema = schema) + // Add table metadata such as table schema, partition columns, etc. to table properties. + val updatedTable = withNewSchema.copy( + properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) + try { + client.alterTable(updatedTable) + } catch { + case NonFatal(e) => + val warningMessage = + s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + + "compatible way. Updating Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema)) + } + } + override def getTable(db: String, table: String): CatalogTable = withClient { restoreTableMetadata(getRawTable(db, table)) } @@ -690,10 +709,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat "different from the schema when this table was created by Spark SQL" + s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " + "from Hive metastore which is not case preserving.") - hiveTable + hiveTable.copy(schemaPreservesCase = false) } } else { - hiveTable + hiveTable.copy(schemaPreservesCase = false) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d135dfa9f4..056af49559 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -19,9 +19,12 @@ package org.apache.spark.sql.hive import java.net.URI +import scala.util.control.NonFatal + import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} @@ -32,6 +35,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.orc.OrcFileFormat +import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._ import org.apache.spark.sql.types._ /** @@ -44,6 +48,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // these are def_s and not val/lazy val since the latter would introduce circular references private def sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache + import HiveMetastoreCatalog._ private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase @@ -130,6 +135,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions val tablePath = new Path(relation.tableMeta.location) + val fileFormat = fileFormatClass.newInstance() + val result = if (relation.isPartitioned) { val partitionSchema = relation.tableMeta.partitionSchema val rootPaths: Seq[Path] = if (lazyPruningEnabled) { @@ -170,16 +177,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } + val (dataSchema, updatedTable) = + inferIfNeeded(relation, options, fileFormat, Option(fileIndex)) + val fsRelation = HadoopFsRelation( location = fileIndex, partitionSchema = partitionSchema, - dataSchema = relation.tableMeta.dataSchema, + dataSchema = dataSchema, // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, - fileFormat = fileFormatClass.newInstance(), + fileFormat = fileFormat, options = options)(sparkSession = sparkSession) - - val created = LogicalRelation(fsRelation, catalogTable = Some(relation.tableMeta)) + val created = LogicalRelation(fsRelation, catalogTable = Some(updatedTable)) tableRelationCache.put(tableIdentifier, created) created } @@ -196,17 +205,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileFormatClass, None) val logicalRelation = cached.getOrElse { + val (dataSchema, updatedTable) = inferIfNeeded(relation, options, fileFormat) val created = LogicalRelation( DataSource( sparkSession = sparkSession, paths = rootPath.toString :: Nil, - userSpecifiedSchema = Some(metastoreSchema), + userSpecifiedSchema = Option(dataSchema), // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, options = options, className = fileType).resolveRelation(), - catalogTable = Some(relation.tableMeta)) + catalogTable = Some(updatedTable)) tableRelationCache.put(tableIdentifier, created) created @@ -218,6 +228,54 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log result.copy(expectedOutputAttributes = Some(relation.output)) } + private def inferIfNeeded( + relation: CatalogRelation, + options: Map[String, String], + fileFormat: FileFormat, + fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = { + val inferenceMode = sparkSession.sessionState.conf.caseSensitiveInferenceMode + val shouldInfer = (inferenceMode != NEVER_INFER) && !relation.tableMeta.schemaPreservesCase + val tableName = relation.tableMeta.identifier.unquotedString + if (shouldInfer) { + logInfo(s"Inferring case-sensitive schema for table $tableName (inference mode: " + + s"$inferenceMode)") + val fileIndex = fileIndexOpt.getOrElse { + val rootPath = new Path(relation.tableMeta.location) + new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None) + } + + val inferredSchema = fileFormat + .inferSchema( + sparkSession, + options, + fileIndex.listFiles(Nil).flatMap(_.files)) + .map(mergeWithMetastoreSchema(relation.tableMeta.schema, _)) + + inferredSchema match { + case Some(schema) => + if (inferenceMode == INFER_AND_SAVE) { + updateCatalogSchema(relation.tableMeta.identifier, schema) + } + (schema, relation.tableMeta.copy(schema = schema)) + case None => + logWarning(s"Unable to infer schema for table $tableName from file format " + + s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.") + (relation.tableMeta.schema, relation.tableMeta) + } + } else { + (relation.tableMeta.schema, relation.tableMeta) + } + } + + private def updateCatalogSchema(identifier: TableIdentifier, schema: StructType): Unit = try { + val db = identifier.database.get + logInfo(s"Saving case-sensitive schema for table ${identifier.unquotedString}") + sparkSession.sharedState.externalCatalog.alterTableSchema(db, identifier.table, schema) + } catch { + case NonFatal(ex) => + logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex) + } + /** * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet * data source relations for better performance. @@ -287,3 +345,33 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } } + +private[hive] object HiveMetastoreCatalog { + def mergeWithMetastoreSchema( + metastoreSchema: StructType, + inferredSchema: StructType): StructType = try { + // Find any nullable fields in mestastore schema that are missing from the inferred schema. + val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap + val missingNullables = metastoreFields + .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_)) + .values + .filter(_.nullable) + + // Merge missing nullable fields to inferred schema and build a case-insensitive field map. + val inferredFields = StructType(inferredSchema ++ missingNullables) + .map(f => f.name.toLowerCase -> f).toMap + StructType(metastoreFields.map { case(name, field) => + field.copy(name = inferredFields(name).name) + }.toSeq) + } catch { + case NonFatal(_) => + val msg = s"""Detected conflicting schemas when merging the schema obtained from the Hive + | Metastore with the one inferred from the file format. Metastore schema: + |${metastoreSchema.prettyJson} + | + |Inferred schema: + |${inferredSchema.prettyJson} + """.stripMargin + throw new SparkException(msg) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala new file mode 100644 index 0000000000..7895580381 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.util.concurrent.{Executors, TimeUnit} + +import scala.util.Random + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode.{Value => InferenceMode, _} +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +class HiveSchemaInferenceSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + import HiveSchemaInferenceSuite._ + import HiveExternalCatalog.DATASOURCE_SCHEMA_PREFIX + + override def beforeEach(): Unit = { + super.beforeEach() + FileStatusCache.resetForTesting() + } + + override def afterEach(): Unit = { + super.afterEach() + spark.sessionState.catalog.tableRelationCache.invalidateAll() + FileStatusCache.resetForTesting() + } + + private val externalCatalog = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + private val client = externalCatalog.client + + // Return a copy of the given schema with all field names converted to lower case. + private def lowerCaseSchema(schema: StructType): StructType = { + StructType(schema.map(f => f.copy(name = f.name.toLowerCase))) + } + + // Create a Hive external test table containing the given field and partition column names. + // Returns a case-sensitive schema for the table. + private def setupExternalTable( + fileType: String, + fields: Seq[String], + partitionCols: Seq[String], + dir: File): StructType = { + // Treat all table fields as bigints... + val structFields = fields.map { field => + StructField( + name = field, + dataType = LongType, + nullable = true, + metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "bigint").build()) + } + // and all partition columns as ints + val partitionStructFields = partitionCols.map { field => + StructField( + // Partition column case isn't preserved + name = field.toLowerCase, + dataType = IntegerType, + nullable = true, + metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "int").build()) + } + val schema = StructType(structFields ++ partitionStructFields) + + // Write some test data (partitioned if specified) + val writer = spark.range(NUM_RECORDS) + .selectExpr((fields ++ partitionCols).map("id as " + _): _*) + .write + .partitionBy(partitionCols: _*) + .mode("overwrite") + fileType match { + case ORC_FILE_TYPE => + writer.orc(dir.getAbsolutePath) + case PARQUET_FILE_TYPE => + writer.parquet(dir.getAbsolutePath) + } + + // Create Hive external table with lowercased schema + val serde = HiveSerDe.serdeMap(fileType) + client.createTable( + CatalogTable( + identifier = TableIdentifier(table = TEST_TABLE_NAME, database = Option(DATABASE)), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + locationUri = Option(new java.net.URI(dir.getAbsolutePath)), + inputFormat = serde.inputFormat, + outputFormat = serde.outputFormat, + serde = serde.serde, + compressed = false, + properties = Map("serialization.format" -> "1")), + schema = schema, + provider = Option("hive"), + partitionColumnNames = partitionCols.map(_.toLowerCase), + properties = Map.empty), + true) + + // Add partition records (if specified) + if (!partitionCols.isEmpty) { + spark.catalog.recoverPartitions(TEST_TABLE_NAME) + } + + // Check that the table returned by HiveExternalCatalog has schemaPreservesCase set to false + // and that the raw table returned by the Hive client doesn't have any Spark SQL properties + // set (table needs to be obtained from client since HiveExternalCatalog filters these + // properties out). + assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase) + val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME) + assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)) == Map.empty) + schema + } + + private def withTestTables( + fileType: String)(f: (Seq[String], Seq[String], StructType) => Unit): Unit = { + // Test both a partitioned and unpartitioned Hive table + val tableFields = Seq( + (Seq("fieldOne"), Seq("partCol1", "partCol2")), + (Seq("fieldOne", "fieldTwo"), Seq.empty[String])) + + tableFields.foreach { case (fields, partCols) => + withTempDir { dir => + val schema = setupExternalTable(fileType, fields, partCols, dir) + withTable(TEST_TABLE_NAME) { f(fields, partCols, schema) } + } + } + } + + private def withFileTypes(f: (String) => Unit): Unit + = Seq(ORC_FILE_TYPE, PARQUET_FILE_TYPE).foreach(f) + + private def withInferenceMode(mode: InferenceMode)(f: => Unit): Unit = { + withSQLConf( + HiveUtils.CONVERT_METASTORE_ORC.key -> "true", + SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key -> mode.toString)(f) + } + + private val inferenceKey = SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key + + private def testFieldQuery(fields: Seq[String]): Unit = { + if (!fields.isEmpty) { + val query = s"SELECT * FROM ${TEST_TABLE_NAME} WHERE ${Random.shuffle(fields).head} >= 0" + assert(spark.sql(query).count == NUM_RECORDS) + } + } + + private def testTableSchema(expectedSchema: StructType): Unit + = assert(spark.table(TEST_TABLE_NAME).schema == expectedSchema) + + withFileTypes { fileType => + test(s"$fileType: schema should be inferred and saved when INFER_AND_SAVE is specified") { + withInferenceMode(INFER_AND_SAVE) { + withTestTables(fileType) { (fields, partCols, schema) => + testFieldQuery(fields) + testFieldQuery(partCols) + testTableSchema(schema) + + // Verify the catalog table now contains the updated schema and properties + val catalogTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) + assert(catalogTable.schemaPreservesCase) + assert(catalogTable.schema == schema) + assert(catalogTable.partitionColumnNames == partCols.map(_.toLowerCase)) + } + } + } + } + + withFileTypes { fileType => + test(s"$fileType: schema should be inferred but not stored when INFER_ONLY is specified") { + withInferenceMode(INFER_ONLY) { + withTestTables(fileType) { (fields, partCols, schema) => + val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) + testFieldQuery(fields) + testFieldQuery(partCols) + testTableSchema(schema) + // Catalog table shouldn't be altered + assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable) + } + } + } + } + + withFileTypes { fileType => + test(s"$fileType: schema should not be inferred when NEVER_INFER is specified") { + withInferenceMode(NEVER_INFER) { + withTestTables(fileType) { (fields, partCols, schema) => + val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) + // Only check the table schema as the test queries will break + testTableSchema(lowerCaseSchema(schema)) + assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable) + } + } + } + } + + test("mergeWithMetastoreSchema() should return expected results") { + // Field type conflict resolution + assertResult( + StructType(Seq( + StructField("lowerCase", StringType), + StructField("UPPERCase", DoubleType, nullable = false)))) { + + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("lowercase", StringType), + StructField("uppercase", DoubleType, nullable = false))), + + StructType(Seq( + StructField("lowerCase", BinaryType), + StructField("UPPERCase", IntegerType, nullable = true)))) + } + + // MetaStore schema is subset of parquet schema + assertResult( + StructType(Seq( + StructField("UPPERCase", DoubleType, nullable = false)))) { + + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("uppercase", DoubleType, nullable = false))), + + StructType(Seq( + StructField("lowerCase", BinaryType), + StructField("UPPERCase", IntegerType, nullable = true)))) + } + + // Metastore schema contains additional non-nullable fields. + assert(intercept[Throwable] { + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("uppercase", DoubleType, nullable = false), + StructField("lowerCase", BinaryType, nullable = false))), + + StructType(Seq( + StructField("UPPERCase", IntegerType, nullable = true)))) + }.getMessage.contains("Detected conflicting schemas")) + + // Conflicting non-nullable field names + intercept[Throwable] { + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq(StructField("lower", StringType, nullable = false))), + StructType(Seq(StructField("lowerCase", BinaryType)))) + } + + // Check that merging missing nullable fields works as expected. + assertResult( + StructType(Seq( + StructField("firstField", StringType, nullable = true), + StructField("secondField", StringType, nullable = true), + StructField("thirdfield", StringType, nullable = true)))) { + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("firstfield", StringType, nullable = true), + StructField("secondfield", StringType, nullable = true), + StructField("thirdfield", StringType, nullable = true))), + StructType(Seq( + StructField("firstField", StringType, nullable = true), + StructField("secondField", StringType, nullable = true)))) + } + + // Merge should fail if the Metastore contains any additional fields that are not + // nullable. + assert(intercept[Throwable] { + HiveMetastoreCatalog.mergeWithMetastoreSchema( + StructType(Seq( + StructField("firstfield", StringType, nullable = true), + StructField("secondfield", StringType, nullable = true), + StructField("thirdfield", StringType, nullable = false))), + StructType(Seq( + StructField("firstField", StringType, nullable = true), + StructField("secondField", StringType, nullable = true)))) + }.getMessage.contains("Detected conflicting schemas")) + } +} + +object HiveSchemaInferenceSuite { + private val NUM_RECORDS = 10 + private val DATABASE = "default" + private val TEST_TABLE_NAME = "test_table" + private val ORC_FILE_TYPE = "orc" + private val PARQUET_FILE_TYPE = "parquet" +} -- cgit v1.2.3