aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorBudde <budde@amazon.com>2017-03-09 12:55:33 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-09 12:55:33 -0800
commitf79371ad86d94da14bd1ddb53e99a388017b6892 (patch)
tree20060cf7dba8c1cbda2536b0674bf9a93103bf93 /sql/catalyst
parentcabe1df8606e7e5b9e6efb106045deb3f39f5f13 (diff)
downloadspark-f79371ad86d94da14bd1ddb53e99a388017b6892.tar.gz
spark-f79371ad86d94da14bd1ddb53e99a388017b6892.tar.bz2
spark-f79371ad86d94da14bd1ddb53e99a388017b6892.zip
[SPARK-19611][SQL] Introduce configurable table schema inference
## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <budde@amazon.com> Closes #16944 from budde/SPARK-19611.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala15
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala3
5 files changed, 47 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 31eded4deb..08a01e8601 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
-
+import org.apache.spark.sql.types.StructType
/**
* Interface for the system catalog (of functions, partitions, tables, and databases).
@@ -104,6 +104,19 @@ abstract class ExternalCatalog {
*/
def alterTable(tableDefinition: CatalogTable): Unit
+ /**
+ * Alter the schema of a table identified by the provided database and table name. The new schema
+ * should still contain the existing bucket columns and partition columns used by the table. This
+ * method will also update any Spark SQL-related parameters stored as Hive table properties (such
+ * as the schema itself).
+ *
+ * @param db Database that table to alter schema for exists in
+ * @param table Name of table to alter schema for
+ * @param schema Updated schema to be used for the table (must contain existing partition and
+ * bucket columns)
+ */
+ def alterTableSchema(db: String, table: String, schema: StructType): Unit
+
def getTable(db: String, table: String): CatalogTable
def getTableOption(db: String, table: String): Option[CatalogTable]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 80aba4af94..5cc6b0abc6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.StructType
/**
* An in-memory (ephemeral) implementation of the system catalog.
@@ -297,6 +298,15 @@ class InMemoryCatalog(
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
}
+ override def alterTableSchema(
+ db: String,
+ table: String,
+ schema: StructType): Unit = synchronized {
+ requireTableExists(db, table)
+ val origTable = catalog(db).tables(table).table
+ catalog(db).tables(table).table = origTable.copy(schema = schema)
+ }
+
override def getTable(db: String, table: String): CatalogTable = synchronized {
requireTableExists(db, table)
catalog(db).tables(table).table
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 4452c47987..e3631b0c07 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -163,6 +163,11 @@ case class BucketSpec(
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
* catalog. If false, it is inferred automatically based on file
* structure.
+ * @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
+ * When using a Hive Metastore, this flag is set to false if a case-
+ * sensitive schema was unable to be read from the table properties.
+ * Used to trigger case-sensitive schema inference at query time, when
+ * configured.
*/
case class CatalogTable(
identifier: TableIdentifier,
@@ -180,7 +185,8 @@ case class CatalogTable(
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
- tracksPartitionsInCatalog: Boolean = false) {
+ tracksPartitionsInCatalog: Boolean = false,
+ schemaPreservesCase: Boolean = true) {
import CatalogTable._
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 07ccd68698..7820f39d96 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -240,6 +240,19 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}
+ test("alter table schema") {
+ val catalog = newBasicCatalog()
+ val tbl1 = catalog.getTable("db2", "tbl1")
+ val newSchema = StructType(Seq(
+ StructField("new_field_1", IntegerType),
+ StructField("new_field_2", StringType),
+ StructField("a", IntegerType),
+ StructField("b", StringType)))
+ catalog.alterTableSchema("db2", "tbl1", newSchema)
+ val newTbl1 = catalog.getTable("db2", "tbl1")
+ assert(newTbl1.schema == newSchema)
+ }
+
test("get table") {
assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index af1eaa1f23..37e3dfabd0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -491,7 +491,8 @@ class TreeNodeSuite extends SparkFunSuite {
"lastAccessTime" -> -1,
"tracksPartitionsInCatalog" -> false,
"properties" -> JNull,
- "unsupportedFeatures" -> List.empty[String]))
+ "unsupportedFeatures" -> List.empty[String],
+ "schemaPreservesCase" -> JBool(true)))
// For unknown case class, returns JNull.
val bigValue = new Array[Int](10000)