From f79371ad86d94da14bd1ddb53e99a388017b6892 Mon Sep 17 00:00:00 2001 From: Budde Date: Thu, 9 Mar 2017 12:55:33 -0800 Subject: [SPARK-19611][SQL] Introduce configurable table schema inference ## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde Closes #16944 from budde/SPARK-19611. --- .../spark/sql/catalyst/catalog/ExternalCatalog.scala | 15 ++++++++++++++- .../spark/sql/catalyst/catalog/InMemoryCatalog.scala | 10 ++++++++++ .../org/apache/spark/sql/catalyst/catalog/interface.scala | 8 +++++++- .../spark/sql/catalyst/catalog/ExternalCatalogSuite.scala | 15 ++++++++++++++- .../apache/spark/sql/catalyst/trees/TreeNodeSuite.scala | 3 ++- 5 files changed, 47 insertions(+), 4 deletions(-) (limited to 'sql/catalyst') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 31eded4deb..08a01e8601 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Expression - +import org.apache.spark.sql.types.StructType /** * Interface for the system catalog (of functions, partitions, tables, and databases). @@ -104,6 +104,19 @@ abstract class ExternalCatalog { */ def alterTable(tableDefinition: CatalogTable): Unit + /** + * Alter the schema of a table identified by the provided database and table name. The new schema + * should still contain the existing bucket columns and partition columns used by the table. This + * method will also update any Spark SQL-related parameters stored as Hive table properties (such + * as the schema itself). + * + * @param db Database that table to alter schema for exists in + * @param table Name of table to alter schema for + * @param schema Updated schema to be used for the table (must contain existing partition and + * bucket columns) + */ + def alterTableSchema(db: String, table: String, schema: StructType): Unit + def getTable(db: String, table: String): CatalogTable def getTableOption(db: String, table: String): Option[CatalogTable] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 80aba4af94..5cc6b0abc6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.types.StructType /** * An in-memory (ephemeral) implementation of the system catalog. @@ -297,6 +298,15 @@ class InMemoryCatalog( catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition } + override def alterTableSchema( + db: String, + table: String, + schema: StructType): Unit = synchronized { + requireTableExists(db, table) + val origTable = catalog(db).tables(table).table + catalog(db).tables(table).table = origTable.copy(schema = schema) + } + override def getTable(db: String, table: String): CatalogTable = synchronized { requireTableExists(db, table) catalog(db).tables(table).table diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 4452c47987..e3631b0c07 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -163,6 +163,11 @@ case class BucketSpec( * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the * catalog. If false, it is inferred automatically based on file * structure. + * @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive. + * When using a Hive Metastore, this flag is set to false if a case- + * sensitive schema was unable to be read from the table properties. + * Used to trigger case-sensitive schema inference at query time, when + * configured. */ case class CatalogTable( identifier: TableIdentifier, @@ -180,7 +185,8 @@ case class CatalogTable( viewText: Option[String] = None, comment: Option[String] = None, unsupportedFeatures: Seq[String] = Seq.empty, - tracksPartitionsInCatalog: Boolean = false) { + tracksPartitionsInCatalog: Boolean = false, + schemaPreservesCase: Boolean = true) { import CatalogTable._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 07ccd68698..7820f39d96 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -240,6 +240,19 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } + test("alter table schema") { + val catalog = newBasicCatalog() + val tbl1 = catalog.getTable("db2", "tbl1") + val newSchema = StructType(Seq( + StructField("new_field_1", IntegerType), + StructField("new_field_2", StringType), + StructField("a", IntegerType), + StructField("b", StringType))) + catalog.alterTableSchema("db2", "tbl1", newSchema) + val newTbl1 = catalog.getTable("db2", "tbl1") + assert(newTbl1.schema == newSchema) + } + test("get table") { assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index af1eaa1f23..37e3dfabd0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -491,7 +491,8 @@ class TreeNodeSuite extends SparkFunSuite { "lastAccessTime" -> -1, "tracksPartitionsInCatalog" -> false, "properties" -> JNull, - "unsupportedFeatures" -> List.empty[String])) + "unsupportedFeatures" -> List.empty[String], + "schemaPreservesCase" -> JBool(true))) // For unknown case class, returns JNull. val bigValue = new Array[Int](10000) -- cgit v1.2.3