diff options
3 files changed, 59 insertions, 41 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 1a9943bc31..065883234a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -21,6 +21,7 @@ import java.io.IOException import java.net.URI import java.util +import scala.collection.mutable import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -219,9 +220,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // table location for tables in default database, while we expect to use the location of // default database. storage = tableDefinition.storage.copy(locationUri = tableLocation), - // Here we follow data source tables and put table metadata like provider, schema, etc. in - // table properties, so that we can work around the Hive metastore issue about not case - // preserving and make Hive serde table support mixed-case column names. + // Here we follow data source tables and put table metadata like table schema, partition + // columns etc. in table properties, so that we can work around the Hive metastore issue + // about not case preserving and make Hive serde table support mixed-case column names. properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition)) client.createTable(tableWithDataSourceProps, ignoreIfExists) } else { @@ -233,10 +234,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { + // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. + val provider = table.provider.get + // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type // support, no column nullability, etc., we should do some extra works before saving table // metadata into Hive metastore: - // 1. Put table metadata like provider, schema, etc. in table properties. + // 1. Put table metadata like table schema, partition columns, etc. in table properties. // 2. Check if this table is hive compatible. // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket // spec to empty and save table metadata to Hive. @@ -244,6 +248,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 val tableProperties = tableMetaToTableProps(table) + // put table provider and partition provider in table properties. + tableProperties.put(DATASOURCE_PROVIDER, provider) + if (table.tracksPartitionsInCatalog) { + tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG) + } + // Ideally we should also put `locationUri` in table properties like provider, schema, etc. // However, in older version of Spark we already store table location in storage properties // with key "path". Here we keep this behaviour for backward compatibility. @@ -290,7 +300,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } val qualifiedTableName = table.identifier.quotedString - val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get) + val maybeSerde = HiveSerDe.sourceToSerDe(provider) val skipHiveMetadata = table.storage.properties .getOrElse("skipHiveMetadata", "false").toBoolean @@ -315,7 +325,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat (Some(newHiveCompatibleMetastoreTable(serde)), message) case _ => - val provider = table.provider.get val message = s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + s"Persisting data source table $qualifiedTableName into Hive metastore in " + @@ -349,21 +358,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat /** * Data source tables may be non Hive compatible and we need to store table metadata in table * properties to workaround some Hive metastore limitations. - * This method puts table provider, partition provider, schema, partition column names, bucket - * specification into a map, which can be used as table properties later. + * This method puts table schema, partition column names, bucket specification into a map, which + * can be used as table properties later. */ - private def tableMetaToTableProps(table: CatalogTable): scala.collection.Map[String, String] = { - // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. - val provider = table.provider.get + private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = { val partitionColumns = table.partitionColumnNames val bucketSpec = table.bucketSpec - val properties = new scala.collection.mutable.HashMap[String, String] - properties.put(DATASOURCE_PROVIDER, provider) - if (table.tracksPartitionsInCatalog) { - properties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG) - } - + val properties = new mutable.HashMap[String, String] // Serialized JSON schema string may be too long to be stored into a single metastore table // property. In this case, we split the JSON string and store each part as a separate table // property. @@ -617,14 +619,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (table.tableType != VIEW) { table.properties.get(DATASOURCE_PROVIDER) match { - // No provider in table properties, which means this table is created by Spark prior to 2.1, - // or is created at Hive side. + // No provider in table properties, which means this is a Hive serde table. case None => - table = table.copy( - provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true) - - // This is a Hive serde table created by Spark 2.1 or higher versions. - case Some(DDLUtils.HIVE_PROVIDER) => table = restoreHiveSerdeTable(table) // This is a regular data source table. @@ -637,7 +633,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) if (statsProps.nonEmpty) { - val colStats = new scala.collection.mutable.HashMap[String, ColumnStat] + val colStats = new mutable.HashMap[String, ColumnStat] // For each column, recover its column stats. Note that this is currently a O(n^2) operation, // but given the number of columns it usually not enormous, this is probably OK as a start. @@ -674,21 +670,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true) - val schemaFromTableProps = getSchemaFromTableProperties(table) - if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) { - hiveTable.copy( - schema = schemaFromTableProps, - partitionColumnNames = getPartitionColumnsFromTableProperties(table), - bucketSpec = getBucketSpecFromTableProperties(table)) + // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its + // schema from table properties. + if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { + val schemaFromTableProps = getSchemaFromTableProperties(table) + if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) { + hiveTable.copy( + schema = schemaFromTableProps, + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table)) + } else { + // Hive metastore may change the table schema, e.g. schema inference. If the table + // schema we read back is different(ignore case and nullability) from the one in table + // properties which was written when creating table, we should respect the table schema + // from hive. + logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " + + "different from the schema when this table was created by Spark SQL" + + s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " + + "from Hive metastore which is not case preserving.") + hiveTable + } } else { - // Hive metastore may change the table schema, e.g. schema inference. If the table - // schema we read back is different(ignore case and nullability) from the one in table - // properties which was written when creating table, we should respect the table schema - // from hive. - logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " + - "different from the schema when this table was created by Spark SQL" + - s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " + - "Hive metastore which is not case preserving.") hiveTable } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index efa0beb850..6fee45824e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -20,8 +20,11 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.types.StructType /** * Test suite for the [[HiveExternalCatalog]]. @@ -52,4 +55,19 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(selectedPartitions.length == 1) assert(selectedPartitions.head.spec == part1.spec) } + + test("SPARK-18647: do not put provider in table properties for Hive serde table") { + val catalog = newBasicCatalog() + val hiveTable = CatalogTable( + identifier = TableIdentifier("hive_tbl", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = storageFormat, + schema = new StructType().add("col1", "int").add("col2", "string"), + provider = Some("hive")) + catalog.createTable(hiveTable, ignoreIfExists = false) + + val rawTable = externalCatalog.client.getTable("db1", "hive_tbl") + assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER)) + assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER)) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 7abc4d9623..0a280b4952 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive -import java.io.File - import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType |