aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorBudde <budde@amazon.com>2017-03-09 12:55:33 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-09 12:55:33 -0800
commitf79371ad86d94da14bd1ddb53e99a388017b6892 (patch)
tree20060cf7dba8c1cbda2536b0674bf9a93103bf93 /sql/core/src/main
parentcabe1df8606e7e5b9e6efb106045deb3f39f5f13 (diff)
downloadspark-f79371ad86d94da14bd1ddb53e99a388017b6892.tar.gz
spark-f79371ad86d94da14bd1ddb53e99a388017b6892.tar.bz2
spark-f79371ad86d94da14bd1ddb53e99a388017b6892.zip
[SPARK-19611][SQL] Introduce configurable table schema inference
## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <budde@amazon.com> Closes #16944 from budde/SPARK-19611.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala65
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala22
2 files changed, 22 insertions, 65 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 828949eddc..5313c2f374 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -476,71 +476,6 @@ object ParquetFileFormat extends Logging {
}
/**
- * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
- * schema and Parquet schema.
- *
- * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
- * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
- * distinguish binary and string). This method generates a correct schema by merging Metastore
- * schema data types and Parquet schema field names.
- */
- def mergeMetastoreParquetSchema(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- def schemaConflictMessage: String =
- s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
- |${metastoreSchema.prettyJson}
- |
- |Parquet schema:
- |${parquetSchema.prettyJson}
- """.stripMargin
-
- val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
-
- assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
-
- val ordinalMap = metastoreSchema.zipWithIndex.map {
- case (field, index) => field.name.toLowerCase -> index
- }.toMap
-
- val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
- ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
- StructType(metastoreSchema.zip(reorderedParquetSchema).map {
- // Uses Parquet field names but retains Metastore data types.
- case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
- mSchema.copy(name = pSchema.name)
- case _ =>
- throw new SparkException(schemaConflictMessage)
- })
- }
-
- /**
- * Returns the original schema from the Parquet file with any missing nullable fields from the
- * Hive Metastore schema merged in.
- *
- * When constructing a DataFrame from a collection of structured data, the resulting object has
- * a schema corresponding to the union of the fields present in each element of the collection.
- * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
- * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
- * contain a particular nullable field in its schema despite that field being present in the
- * table schema obtained from the Hive Metastore. This method returns a schema representing the
- * Parquet file schema along with any additional nullable fields from the Metastore schema
- * merged in.
- */
- private[parquet] def mergeMissingNullableFields(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
- val missingFields = metastoreSchema
- .map(_.name.toLowerCase)
- .diff(parquetSchema.map(_.name.toLowerCase))
- .map(fieldMap(_))
- .filter(_.nullable)
- StructType(parquetSchema ++ missingFields)
- }
-
- /**
* Reads Parquet footers in multi-threaded manner.
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
* files when reading footers.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1244f690fd..8e3f567b7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -296,6 +296,25 @@ object SQLConf {
.longConf
.createWithDefault(250 * 1024 * 1024)
+ object HiveCaseSensitiveInferenceMode extends Enumeration {
+ val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
+ }
+
+ val HIVE_CASE_SENSITIVE_INFERENCE = buildConf("spark.sql.hive.caseSensitiveInferenceMode")
+ .doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " +
+ "table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " +
+ "formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " +
+ "any table backed by files containing case-sensitive field names or queries may not return " +
+ "accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " +
+ "case-sensitive schema from the underlying data files and write it back to the table " +
+ "properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " +
+ "properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " +
+ "instead of inferring).")
+ .stringConf
+ .transform(_.toUpperCase())
+ .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
+ .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
+
val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
@@ -792,6 +811,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
+ def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
+ HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))
+
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)