aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorBudde <budde@amazon.com>2017-03-09 12:55:33 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-09 12:55:33 -0800
commitf79371ad86d94da14bd1ddb53e99a388017b6892 (patch)
tree20060cf7dba8c1cbda2536b0674bf9a93103bf93 /sql
parentcabe1df8606e7e5b9e6efb106045deb3f39f5f13 (diff)
downloadspark-f79371ad86d94da14bd1ddb53e99a388017b6892.tar.gz
spark-f79371ad86d94da14bd1ddb53e99a388017b6892.tar.bz2
spark-f79371ad86d94da14bd1ddb53e99a388017b6892.zip
[SPARK-19611][SQL] Introduce configurable table schema inference
## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <budde@amazon.com> Closes #16944 from budde/SPARK-19611.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala15
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala65
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala82
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala23
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala100
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala305
11 files changed, 489 insertions, 159 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 31eded4deb..08a01e8601 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
-
+import org.apache.spark.sql.types.StructType
/**
* Interface for the system catalog (of functions, partitions, tables, and databases).
@@ -104,6 +104,19 @@ abstract class ExternalCatalog {
*/
def alterTable(tableDefinition: CatalogTable): Unit
+ /**
+ * Alter the schema of a table identified by the provided database and table name. The new schema
+ * should still contain the existing bucket columns and partition columns used by the table. This
+ * method will also update any Spark SQL-related parameters stored as Hive table properties (such
+ * as the schema itself).
+ *
+ * @param db Database that table to alter schema for exists in
+ * @param table Name of table to alter schema for
+ * @param schema Updated schema to be used for the table (must contain existing partition and
+ * bucket columns)
+ */
+ def alterTableSchema(db: String, table: String, schema: StructType): Unit
+
def getTable(db: String, table: String): CatalogTable
def getTableOption(db: String, table: String): Option[CatalogTable]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 80aba4af94..5cc6b0abc6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.StructType
/**
* An in-memory (ephemeral) implementation of the system catalog.
@@ -297,6 +298,15 @@ class InMemoryCatalog(
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
}
+ override def alterTableSchema(
+ db: String,
+ table: String,
+ schema: StructType): Unit = synchronized {
+ requireTableExists(db, table)
+ val origTable = catalog(db).tables(table).table
+ catalog(db).tables(table).table = origTable.copy(schema = schema)
+ }
+
override def getTable(db: String, table: String): CatalogTable = synchronized {
requireTableExists(db, table)
catalog(db).tables(table).table
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 4452c47987..e3631b0c07 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -163,6 +163,11 @@ case class BucketSpec(
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
* catalog. If false, it is inferred automatically based on file
* structure.
+ * @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
+ * When using a Hive Metastore, this flag is set to false if a case-
+ * sensitive schema was unable to be read from the table properties.
+ * Used to trigger case-sensitive schema inference at query time, when
+ * configured.
*/
case class CatalogTable(
identifier: TableIdentifier,
@@ -180,7 +185,8 @@ case class CatalogTable(
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
- tracksPartitionsInCatalog: Boolean = false) {
+ tracksPartitionsInCatalog: Boolean = false,
+ schemaPreservesCase: Boolean = true) {
import CatalogTable._
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 07ccd68698..7820f39d96 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -240,6 +240,19 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}
+ test("alter table schema") {
+ val catalog = newBasicCatalog()
+ val tbl1 = catalog.getTable("db2", "tbl1")
+ val newSchema = StructType(Seq(
+ StructField("new_field_1", IntegerType),
+ StructField("new_field_2", StringType),
+ StructField("a", IntegerType),
+ StructField("b", StringType)))
+ catalog.alterTableSchema("db2", "tbl1", newSchema)
+ val newTbl1 = catalog.getTable("db2", "tbl1")
+ assert(newTbl1.schema == newSchema)
+ }
+
test("get table") {
assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index af1eaa1f23..37e3dfabd0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -491,7 +491,8 @@ class TreeNodeSuite extends SparkFunSuite {
"lastAccessTime" -> -1,
"tracksPartitionsInCatalog" -> false,
"properties" -> JNull,
- "unsupportedFeatures" -> List.empty[String]))
+ "unsupportedFeatures" -> List.empty[String],
+ "schemaPreservesCase" -> JBool(true)))
// For unknown case class, returns JNull.
val bigValue = new Array[Int](10000)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 828949eddc..5313c2f374 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -476,71 +476,6 @@ object ParquetFileFormat extends Logging {
}
/**
- * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
- * schema and Parquet schema.
- *
- * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
- * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
- * distinguish binary and string). This method generates a correct schema by merging Metastore
- * schema data types and Parquet schema field names.
- */
- def mergeMetastoreParquetSchema(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- def schemaConflictMessage: String =
- s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
- |${metastoreSchema.prettyJson}
- |
- |Parquet schema:
- |${parquetSchema.prettyJson}
- """.stripMargin
-
- val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
-
- assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
-
- val ordinalMap = metastoreSchema.zipWithIndex.map {
- case (field, index) => field.name.toLowerCase -> index
- }.toMap
-
- val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
- ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
- StructType(metastoreSchema.zip(reorderedParquetSchema).map {
- // Uses Parquet field names but retains Metastore data types.
- case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
- mSchema.copy(name = pSchema.name)
- case _ =>
- throw new SparkException(schemaConflictMessage)
- })
- }
-
- /**
- * Returns the original schema from the Parquet file with any missing nullable fields from the
- * Hive Metastore schema merged in.
- *
- * When constructing a DataFrame from a collection of structured data, the resulting object has
- * a schema corresponding to the union of the fields present in each element of the collection.
- * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
- * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
- * contain a particular nullable field in its schema despite that field being present in the
- * table schema obtained from the Hive Metastore. This method returns a schema representing the
- * Parquet file schema along with any additional nullable fields from the Metastore schema
- * merged in.
- */
- private[parquet] def mergeMissingNullableFields(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
- val missingFields = metastoreSchema
- .map(_.name.toLowerCase)
- .diff(parquetSchema.map(_.name.toLowerCase))
- .map(fieldMap(_))
- .filter(_.nullable)
- StructType(parquetSchema ++ missingFields)
- }
-
- /**
* Reads Parquet footers in multi-threaded manner.
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
* files when reading footers.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1244f690fd..8e3f567b7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -296,6 +296,25 @@ object SQLConf {
.longConf
.createWithDefault(250 * 1024 * 1024)
+ object HiveCaseSensitiveInferenceMode extends Enumeration {
+ val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
+ }
+
+ val HIVE_CASE_SENSITIVE_INFERENCE = buildConf("spark.sql.hive.caseSensitiveInferenceMode")
+ .doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " +
+ "table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " +
+ "formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " +
+ "any table backed by files containing case-sensitive field names or queries may not return " +
+ "accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " +
+ "case-sensitive schema from the underlying data files and write it back to the table " +
+ "properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " +
+ "properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " +
+ "instead of inferring).")
+ .stringConf
+ .transform(_.toUpperCase())
+ .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
+ .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
+
val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
@@ -792,6 +811,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
+ def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
+ HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))
+
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8a980a7eb5..6aa940afbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -368,88 +368,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}
- test("merge with metastore schema") {
- // Field type conflict resolution
- assertResult(
- StructType(Seq(
- StructField("lowerCase", StringType),
- StructField("UPPERCase", DoubleType, nullable = false)))) {
-
- ParquetFileFormat.mergeMetastoreParquetSchema(
- StructType(Seq(
- StructField("lowercase", StringType),
- StructField("uppercase", DoubleType, nullable = false))),
-
- StructType(Seq(
- StructField("lowerCase", BinaryType),
- StructField("UPPERCase", IntegerType, nullable = true))))
- }
-
- // MetaStore schema is subset of parquet schema
- assertResult(
- StructType(Seq(
- StructField("UPPERCase", DoubleType, nullable = false)))) {
-
- ParquetFileFormat.mergeMetastoreParquetSchema(
- StructType(Seq(
- StructField("uppercase", DoubleType, nullable = false))),
-
- StructType(Seq(
- StructField("lowerCase", BinaryType),
- StructField("UPPERCase", IntegerType, nullable = true))))
- }
-
- // Metastore schema contains additional non-nullable fields.
- assert(intercept[Throwable] {
- ParquetFileFormat.mergeMetastoreParquetSchema(
- StructType(Seq(
- StructField("uppercase", DoubleType, nullable = false),
- StructField("lowerCase", BinaryType, nullable = false))),
-
- StructType(Seq(
- StructField("UPPERCase", IntegerType, nullable = true))))
- }.getMessage.contains("detected conflicting schemas"))
-
- // Conflicting non-nullable field names
- intercept[Throwable] {
- ParquetFileFormat.mergeMetastoreParquetSchema(
- StructType(Seq(StructField("lower", StringType, nullable = false))),
- StructType(Seq(StructField("lowerCase", BinaryType))))
- }
- }
-
- test("merge missing nullable fields from Metastore schema") {
- // Standard case: Metastore schema contains additional nullable fields not present
- // in the Parquet file schema.
- assertResult(
- StructType(Seq(
- StructField("firstField", StringType, nullable = true),
- StructField("secondField", StringType, nullable = true),
- StructField("thirdfield", StringType, nullable = true)))) {
- ParquetFileFormat.mergeMetastoreParquetSchema(
- StructType(Seq(
- StructField("firstfield", StringType, nullable = true),
- StructField("secondfield", StringType, nullable = true),
- StructField("thirdfield", StringType, nullable = true))),
- StructType(Seq(
- StructField("firstField", StringType, nullable = true),
- StructField("secondField", StringType, nullable = true))))
- }
-
- // Merge should fail if the Metastore contains any additional fields that are not
- // nullable.
- assert(intercept[Throwable] {
- ParquetFileFormat.mergeMetastoreParquetSchema(
- StructType(Seq(
- StructField("firstfield", StringType, nullable = true),
- StructField("secondfield", StringType, nullable = true),
- StructField("thirdfield", StringType, nullable = false))),
- StructType(Seq(
- StructField("firstField", StringType, nullable = true),
- StructField("secondField", StringType, nullable = true))))
- }.getMessage.contains("detected conflicting schemas"))
- }
-
test("schema merging failure error message") {
import testImplicits._
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 9ab4624594..78aa2bd249 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -597,6 +597,25 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}
+ override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
+ requireTableExists(db, table)
+ val rawTable = getRawTable(db, table)
+ val withNewSchema = rawTable.copy(schema = schema)
+ // Add table metadata such as table schema, partition columns, etc. to table properties.
+ val updatedTable = withNewSchema.copy(
+ properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
+ try {
+ client.alterTable(updatedTable)
+ } catch {
+ case NonFatal(e) =>
+ val warningMessage =
+ s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
+ "compatible way. Updating Hive metastore in Spark SQL specific format."
+ logWarning(warningMessage, e)
+ client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
+ }
+ }
+
override def getTable(db: String, table: String): CatalogTable = withClient {
restoreTableMetadata(getRawTable(db, table))
}
@@ -690,10 +709,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
"different from the schema when this table was created by Spark SQL" +
s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
"from Hive metastore which is not case preserving.")
- hiveTable
+ hiveTable.copy(schemaPreservesCase = false)
}
} else {
- hiveTable
+ hiveTable.copy(schemaPreservesCase = false)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d135dfa9f4..056af49559 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,9 +19,12 @@ package org.apache.spark.sql.hive
import java.net.URI
+import scala.util.control.NonFatal
+
import com.google.common.util.concurrent.Striped
import org.apache.hadoop.fs.Path
+import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
@@ -32,6 +35,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._
/**
@@ -44,6 +48,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// these are def_s and not val/lazy val since the latter would introduce circular references
private def sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
+ import HiveMetastoreCatalog._
private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
@@ -130,6 +135,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
val tablePath = new Path(relation.tableMeta.location)
+ val fileFormat = fileFormatClass.newInstance()
+
val result = if (relation.isPartitioned) {
val partitionSchema = relation.tableMeta.partitionSchema
val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
@@ -170,16 +177,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}
+ val (dataSchema, updatedTable) =
+ inferIfNeeded(relation, options, fileFormat, Option(fileIndex))
+
val fsRelation = HadoopFsRelation(
location = fileIndex,
partitionSchema = partitionSchema,
- dataSchema = relation.tableMeta.dataSchema,
+ dataSchema = dataSchema,
// We don't support hive bucketed tables, only ones we write out.
bucketSpec = None,
- fileFormat = fileFormatClass.newInstance(),
+ fileFormat = fileFormat,
options = options)(sparkSession = sparkSession)
-
- val created = LogicalRelation(fsRelation, catalogTable = Some(relation.tableMeta))
+ val created = LogicalRelation(fsRelation, catalogTable = Some(updatedTable))
tableRelationCache.put(tableIdentifier, created)
created
}
@@ -196,17 +205,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
fileFormatClass,
None)
val logicalRelation = cached.getOrElse {
+ val (dataSchema, updatedTable) = inferIfNeeded(relation, options, fileFormat)
val created =
LogicalRelation(
DataSource(
sparkSession = sparkSession,
paths = rootPath.toString :: Nil,
- userSpecifiedSchema = Some(metastoreSchema),
+ userSpecifiedSchema = Option(dataSchema),
// We don't support hive bucketed tables, only ones we write out.
bucketSpec = None,
options = options,
className = fileType).resolveRelation(),
- catalogTable = Some(relation.tableMeta))
+ catalogTable = Some(updatedTable))
tableRelationCache.put(tableIdentifier, created)
created
@@ -218,6 +228,54 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
result.copy(expectedOutputAttributes = Some(relation.output))
}
+ private def inferIfNeeded(
+ relation: CatalogRelation,
+ options: Map[String, String],
+ fileFormat: FileFormat,
+ fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
+ val inferenceMode = sparkSession.sessionState.conf.caseSensitiveInferenceMode
+ val shouldInfer = (inferenceMode != NEVER_INFER) && !relation.tableMeta.schemaPreservesCase
+ val tableName = relation.tableMeta.identifier.unquotedString
+ if (shouldInfer) {
+ logInfo(s"Inferring case-sensitive schema for table $tableName (inference mode: " +
+ s"$inferenceMode)")
+ val fileIndex = fileIndexOpt.getOrElse {
+ val rootPath = new Path(relation.tableMeta.location)
+ new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None)
+ }
+
+ val inferredSchema = fileFormat
+ .inferSchema(
+ sparkSession,
+ options,
+ fileIndex.listFiles(Nil).flatMap(_.files))
+ .map(mergeWithMetastoreSchema(relation.tableMeta.schema, _))
+
+ inferredSchema match {
+ case Some(schema) =>
+ if (inferenceMode == INFER_AND_SAVE) {
+ updateCatalogSchema(relation.tableMeta.identifier, schema)
+ }
+ (schema, relation.tableMeta.copy(schema = schema))
+ case None =>
+ logWarning(s"Unable to infer schema for table $tableName from file format " +
+ s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.")
+ (relation.tableMeta.schema, relation.tableMeta)
+ }
+ } else {
+ (relation.tableMeta.schema, relation.tableMeta)
+ }
+ }
+
+ private def updateCatalogSchema(identifier: TableIdentifier, schema: StructType): Unit = try {
+ val db = identifier.database.get
+ logInfo(s"Saving case-sensitive schema for table ${identifier.unquotedString}")
+ sparkSession.sharedState.externalCatalog.alterTableSchema(db, identifier.table, schema)
+ } catch {
+ case NonFatal(ex) =>
+ logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex)
+ }
+
/**
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
* data source relations for better performance.
@@ -287,3 +345,33 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}
}
+
+private[hive] object HiveMetastoreCatalog {
+ def mergeWithMetastoreSchema(
+ metastoreSchema: StructType,
+ inferredSchema: StructType): StructType = try {
+ // Find any nullable fields in mestastore schema that are missing from the inferred schema.
+ val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+ val missingNullables = metastoreFields
+ .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_))
+ .values
+ .filter(_.nullable)
+
+ // Merge missing nullable fields to inferred schema and build a case-insensitive field map.
+ val inferredFields = StructType(inferredSchema ++ missingNullables)
+ .map(f => f.name.toLowerCase -> f).toMap
+ StructType(metastoreFields.map { case(name, field) =>
+ field.copy(name = inferredFields(name).name)
+ }.toSeq)
+ } catch {
+ case NonFatal(_) =>
+ val msg = s"""Detected conflicting schemas when merging the schema obtained from the Hive
+ | Metastore with the one inferred from the file format. Metastore schema:
+ |${metastoreSchema.prettyJson}
+ |
+ |Inferred schema:
+ |${inferredSchema.prettyJson}
+ """.stripMargin
+ throw new SparkException(msg)
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
new file mode 100644
index 0000000000..7895580381
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode.{Value => InferenceMode, _}
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+class HiveSchemaInferenceSuite
+ extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
+
+ import HiveSchemaInferenceSuite._
+ import HiveExternalCatalog.DATASOURCE_SCHEMA_PREFIX
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ FileStatusCache.resetForTesting()
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+ spark.sessionState.catalog.tableRelationCache.invalidateAll()
+ FileStatusCache.resetForTesting()
+ }
+
+ private val externalCatalog = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+ private val client = externalCatalog.client
+
+ // Return a copy of the given schema with all field names converted to lower case.
+ private def lowerCaseSchema(schema: StructType): StructType = {
+ StructType(schema.map(f => f.copy(name = f.name.toLowerCase)))
+ }
+
+ // Create a Hive external test table containing the given field and partition column names.
+ // Returns a case-sensitive schema for the table.
+ private def setupExternalTable(
+ fileType: String,
+ fields: Seq[String],
+ partitionCols: Seq[String],
+ dir: File): StructType = {
+ // Treat all table fields as bigints...
+ val structFields = fields.map { field =>
+ StructField(
+ name = field,
+ dataType = LongType,
+ nullable = true,
+ metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "bigint").build())
+ }
+ // and all partition columns as ints
+ val partitionStructFields = partitionCols.map { field =>
+ StructField(
+ // Partition column case isn't preserved
+ name = field.toLowerCase,
+ dataType = IntegerType,
+ nullable = true,
+ metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "int").build())
+ }
+ val schema = StructType(structFields ++ partitionStructFields)
+
+ // Write some test data (partitioned if specified)
+ val writer = spark.range(NUM_RECORDS)
+ .selectExpr((fields ++ partitionCols).map("id as " + _): _*)
+ .write
+ .partitionBy(partitionCols: _*)
+ .mode("overwrite")
+ fileType match {
+ case ORC_FILE_TYPE =>
+ writer.orc(dir.getAbsolutePath)
+ case PARQUET_FILE_TYPE =>
+ writer.parquet(dir.getAbsolutePath)
+ }
+
+ // Create Hive external table with lowercased schema
+ val serde = HiveSerDe.serdeMap(fileType)
+ client.createTable(
+ CatalogTable(
+ identifier = TableIdentifier(table = TEST_TABLE_NAME, database = Option(DATABASE)),
+ tableType = CatalogTableType.EXTERNAL,
+ storage = CatalogStorageFormat(
+ locationUri = Option(new java.net.URI(dir.getAbsolutePath)),
+ inputFormat = serde.inputFormat,
+ outputFormat = serde.outputFormat,
+ serde = serde.serde,
+ compressed = false,
+ properties = Map("serialization.format" -> "1")),
+ schema = schema,
+ provider = Option("hive"),
+ partitionColumnNames = partitionCols.map(_.toLowerCase),
+ properties = Map.empty),
+ true)
+
+ // Add partition records (if specified)
+ if (!partitionCols.isEmpty) {
+ spark.catalog.recoverPartitions(TEST_TABLE_NAME)
+ }
+
+ // Check that the table returned by HiveExternalCatalog has schemaPreservesCase set to false
+ // and that the raw table returned by the Hive client doesn't have any Spark SQL properties
+ // set (table needs to be obtained from client since HiveExternalCatalog filters these
+ // properties out).
+ assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase)
+ val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
+ assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)) == Map.empty)
+ schema
+ }
+
+ private def withTestTables(
+ fileType: String)(f: (Seq[String], Seq[String], StructType) => Unit): Unit = {
+ // Test both a partitioned and unpartitioned Hive table
+ val tableFields = Seq(
+ (Seq("fieldOne"), Seq("partCol1", "partCol2")),
+ (Seq("fieldOne", "fieldTwo"), Seq.empty[String]))
+
+ tableFields.foreach { case (fields, partCols) =>
+ withTempDir { dir =>
+ val schema = setupExternalTable(fileType, fields, partCols, dir)
+ withTable(TEST_TABLE_NAME) { f(fields, partCols, schema) }
+ }
+ }
+ }
+
+ private def withFileTypes(f: (String) => Unit): Unit
+ = Seq(ORC_FILE_TYPE, PARQUET_FILE_TYPE).foreach(f)
+
+ private def withInferenceMode(mode: InferenceMode)(f: => Unit): Unit = {
+ withSQLConf(
+ HiveUtils.CONVERT_METASTORE_ORC.key -> "true",
+ SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key -> mode.toString)(f)
+ }
+
+ private val inferenceKey = SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key
+
+ private def testFieldQuery(fields: Seq[String]): Unit = {
+ if (!fields.isEmpty) {
+ val query = s"SELECT * FROM ${TEST_TABLE_NAME} WHERE ${Random.shuffle(fields).head} >= 0"
+ assert(spark.sql(query).count == NUM_RECORDS)
+ }
+ }
+
+ private def testTableSchema(expectedSchema: StructType): Unit
+ = assert(spark.table(TEST_TABLE_NAME).schema == expectedSchema)
+
+ withFileTypes { fileType =>
+ test(s"$fileType: schema should be inferred and saved when INFER_AND_SAVE is specified") {
+ withInferenceMode(INFER_AND_SAVE) {
+ withTestTables(fileType) { (fields, partCols, schema) =>
+ testFieldQuery(fields)
+ testFieldQuery(partCols)
+ testTableSchema(schema)
+
+ // Verify the catalog table now contains the updated schema and properties
+ val catalogTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
+ assert(catalogTable.schemaPreservesCase)
+ assert(catalogTable.schema == schema)
+ assert(catalogTable.partitionColumnNames == partCols.map(_.toLowerCase))
+ }
+ }
+ }
+ }
+
+ withFileTypes { fileType =>
+ test(s"$fileType: schema should be inferred but not stored when INFER_ONLY is specified") {
+ withInferenceMode(INFER_ONLY) {
+ withTestTables(fileType) { (fields, partCols, schema) =>
+ val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
+ testFieldQuery(fields)
+ testFieldQuery(partCols)
+ testTableSchema(schema)
+ // Catalog table shouldn't be altered
+ assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable)
+ }
+ }
+ }
+ }
+
+ withFileTypes { fileType =>
+ test(s"$fileType: schema should not be inferred when NEVER_INFER is specified") {
+ withInferenceMode(NEVER_INFER) {
+ withTestTables(fileType) { (fields, partCols, schema) =>
+ val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
+ // Only check the table schema as the test queries will break
+ testTableSchema(lowerCaseSchema(schema))
+ assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable)
+ }
+ }
+ }
+ }
+
+ test("mergeWithMetastoreSchema() should return expected results") {
+ // Field type conflict resolution
+ assertResult(
+ StructType(Seq(
+ StructField("lowerCase", StringType),
+ StructField("UPPERCase", DoubleType, nullable = false)))) {
+
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("lowercase", StringType),
+ StructField("uppercase", DoubleType, nullable = false))),
+
+ StructType(Seq(
+ StructField("lowerCase", BinaryType),
+ StructField("UPPERCase", IntegerType, nullable = true))))
+ }
+
+ // MetaStore schema is subset of parquet schema
+ assertResult(
+ StructType(Seq(
+ StructField("UPPERCase", DoubleType, nullable = false)))) {
+
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("uppercase", DoubleType, nullable = false))),
+
+ StructType(Seq(
+ StructField("lowerCase", BinaryType),
+ StructField("UPPERCase", IntegerType, nullable = true))))
+ }
+
+ // Metastore schema contains additional non-nullable fields.
+ assert(intercept[Throwable] {
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("uppercase", DoubleType, nullable = false),
+ StructField("lowerCase", BinaryType, nullable = false))),
+
+ StructType(Seq(
+ StructField("UPPERCase", IntegerType, nullable = true))))
+ }.getMessage.contains("Detected conflicting schemas"))
+
+ // Conflicting non-nullable field names
+ intercept[Throwable] {
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(StructField("lower", StringType, nullable = false))),
+ StructType(Seq(StructField("lowerCase", BinaryType))))
+ }
+
+ // Check that merging missing nullable fields works as expected.
+ assertResult(
+ StructType(Seq(
+ StructField("firstField", StringType, nullable = true),
+ StructField("secondField", StringType, nullable = true),
+ StructField("thirdfield", StringType, nullable = true)))) {
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("firstfield", StringType, nullable = true),
+ StructField("secondfield", StringType, nullable = true),
+ StructField("thirdfield", StringType, nullable = true))),
+ StructType(Seq(
+ StructField("firstField", StringType, nullable = true),
+ StructField("secondField", StringType, nullable = true))))
+ }
+
+ // Merge should fail if the Metastore contains any additional fields that are not
+ // nullable.
+ assert(intercept[Throwable] {
+ HiveMetastoreCatalog.mergeWithMetastoreSchema(
+ StructType(Seq(
+ StructField("firstfield", StringType, nullable = true),
+ StructField("secondfield", StringType, nullable = true),
+ StructField("thirdfield", StringType, nullable = false))),
+ StructType(Seq(
+ StructField("firstField", StringType, nullable = true),
+ StructField("secondField", StringType, nullable = true))))
+ }.getMessage.contains("Detected conflicting schemas"))
+ }
+}
+
+object HiveSchemaInferenceSuite {
+ private val NUM_RECORDS = 10
+ private val DATABASE = "default"
+ private val TEST_TABLE_NAME = "test_table"
+ private val ORC_FILE_TYPE = "orc"
+ private val PARQUET_FILE_TYPE = "parquet"
+}