aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala80
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala2
3 files changed, 59 insertions, 41 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1a9943bc31..065883234a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -21,6 +21,7 @@ import java.io.IOException
import java.net.URI
import java.util
+import scala.collection.mutable
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
@@ -219,9 +220,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// table location for tables in default database, while we expect to use the location of
// default database.
storage = tableDefinition.storage.copy(locationUri = tableLocation),
- // Here we follow data source tables and put table metadata like provider, schema, etc. in
- // table properties, so that we can work around the Hive metastore issue about not case
- // preserving and make Hive serde table support mixed-case column names.
+ // Here we follow data source tables and put table metadata like table schema, partition
+ // columns etc. in table properties, so that we can work around the Hive metastore issue
+ // about not case preserving and make Hive serde table support mixed-case column names.
properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
client.createTable(tableWithDataSourceProps, ignoreIfExists)
} else {
@@ -233,10 +234,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
+ // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
+ val provider = table.provider.get
+
// To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
// support, no column nullability, etc., we should do some extra works before saving table
// metadata into Hive metastore:
- // 1. Put table metadata like provider, schema, etc. in table properties.
+ // 1. Put table metadata like table schema, partition columns, etc. in table properties.
// 2. Check if this table is hive compatible.
// 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket
// spec to empty and save table metadata to Hive.
@@ -244,6 +248,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
val tableProperties = tableMetaToTableProps(table)
+ // put table provider and partition provider in table properties.
+ tableProperties.put(DATASOURCE_PROVIDER, provider)
+ if (table.tracksPartitionsInCatalog) {
+ tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
+ }
+
// Ideally we should also put `locationUri` in table properties like provider, schema, etc.
// However, in older version of Spark we already store table location in storage properties
// with key "path". Here we keep this behaviour for backward compatibility.
@@ -290,7 +300,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
val qualifiedTableName = table.identifier.quotedString
- val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get)
+ val maybeSerde = HiveSerDe.sourceToSerDe(provider)
val skipHiveMetadata = table.storage.properties
.getOrElse("skipHiveMetadata", "false").toBoolean
@@ -315,7 +325,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
(Some(newHiveCompatibleMetastoreTable(serde)), message)
case _ =>
- val provider = table.provider.get
val message =
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
s"Persisting data source table $qualifiedTableName into Hive metastore in " +
@@ -349,21 +358,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
/**
* Data source tables may be non Hive compatible and we need to store table metadata in table
* properties to workaround some Hive metastore limitations.
- * This method puts table provider, partition provider, schema, partition column names, bucket
- * specification into a map, which can be used as table properties later.
+ * This method puts table schema, partition column names, bucket specification into a map, which
+ * can be used as table properties later.
*/
- private def tableMetaToTableProps(table: CatalogTable): scala.collection.Map[String, String] = {
- // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
- val provider = table.provider.get
+ private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
val partitionColumns = table.partitionColumnNames
val bucketSpec = table.bucketSpec
- val properties = new scala.collection.mutable.HashMap[String, String]
- properties.put(DATASOURCE_PROVIDER, provider)
- if (table.tracksPartitionsInCatalog) {
- properties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
- }
-
+ val properties = new mutable.HashMap[String, String]
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
@@ -617,14 +619,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
if (table.tableType != VIEW) {
table.properties.get(DATASOURCE_PROVIDER) match {
- // No provider in table properties, which means this table is created by Spark prior to 2.1,
- // or is created at Hive side.
+ // No provider in table properties, which means this is a Hive serde table.
case None =>
- table = table.copy(
- provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true)
-
- // This is a Hive serde table created by Spark 2.1 or higher versions.
- case Some(DDLUtils.HIVE_PROVIDER) =>
table = restoreHiveSerdeTable(table)
// This is a regular data source table.
@@ -637,7 +633,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
if (statsProps.nonEmpty) {
- val colStats = new scala.collection.mutable.HashMap[String, ColumnStat]
+ val colStats = new mutable.HashMap[String, ColumnStat]
// For each column, recover its column stats. Note that this is currently a O(n^2) operation,
// but given the number of columns it usually not enormous, this is probably OK as a start.
@@ -674,21 +670,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
provider = Some(DDLUtils.HIVE_PROVIDER),
tracksPartitionsInCatalog = true)
- val schemaFromTableProps = getSchemaFromTableProperties(table)
- if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
- hiveTable.copy(
- schema = schemaFromTableProps,
- partitionColumnNames = getPartitionColumnsFromTableProperties(table),
- bucketSpec = getBucketSpecFromTableProperties(table))
+ // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
+ // schema from table properties.
+ if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
+ val schemaFromTableProps = getSchemaFromTableProperties(table)
+ if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
+ hiveTable.copy(
+ schema = schemaFromTableProps,
+ partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+ bucketSpec = getBucketSpecFromTableProperties(table))
+ } else {
+ // Hive metastore may change the table schema, e.g. schema inference. If the table
+ // schema we read back is different(ignore case and nullability) from the one in table
+ // properties which was written when creating table, we should respect the table schema
+ // from hive.
+ logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
+ "different from the schema when this table was created by Spark SQL" +
+ s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
+ "from Hive metastore which is not case preserving.")
+ hiveTable
+ }
} else {
- // Hive metastore may change the table schema, e.g. schema inference. If the table
- // schema we read back is different(ignore case and nullability) from the one in table
- // properties which was written when creating table, we should respect the table schema
- // from hive.
- logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
- "different from the schema when this table was created by Spark SQL" +
- s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " +
- "Hive metastore which is not case preserving.")
hiveTable
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index efa0beb850..6fee45824e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -20,8 +20,11 @@ package org.apache.spark.sql.hive
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.types.StructType
/**
* Test suite for the [[HiveExternalCatalog]].
@@ -52,4 +55,19 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
assert(selectedPartitions.length == 1)
assert(selectedPartitions.head.spec == part1.spec)
}
+
+ test("SPARK-18647: do not put provider in table properties for Hive serde table") {
+ val catalog = newBasicCatalog()
+ val hiveTable = CatalogTable(
+ identifier = TableIdentifier("hive_tbl", Some("db1")),
+ tableType = CatalogTableType.MANAGED,
+ storage = storageFormat,
+ schema = new StructType().add("col1", "int").add("col2", "string"),
+ provider = Some("hive"))
+ catalog.createTable(hiveTable, ignoreIfExists = false)
+
+ val rawTable = externalCatalog.client.getTable("db1", "hive_tbl")
+ assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
+ assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER))
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 7abc4d9623..0a280b4952 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.hive
-import java.io.File
-
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType