diff options
author | Eric Liang <ekl@databricks.com> | 2016-10-27 14:22:30 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-10-27 14:22:30 -0700 |
commit | ccb11543048dccd4cc590a8db1df1d9d5847d112 (patch) | |
tree | d015232ebf5b1232e8f5df4aebe4854c37cae2b2 /sql/hive/src/main | |
parent | 79fd0cc0584e48fb021c4237877b15abbffb319a (diff) | |
download | spark-ccb11543048dccd4cc590a8db1df1d9d5847d112.tar.gz spark-ccb11543048dccd4cc590a8db1df1d9d5847d112.tar.bz2 spark-ccb11543048dccd4cc590a8db1df1d9d5847d112.zip |
[SPARK-17970][SQL] store partition spec in metastore for data source table
## What changes were proposed in this pull request?
We should follow hive table and also store partition spec in metastore for data source table.
This brings 2 benefits:
1. It's more flexible to manage the table data files, as users can use `ADD PARTITION`, `DROP PARTITION` and `RENAME PARTITION`
2. We don't need to cache all file status for data source table anymore.
## How was this patch tested?
existing tests.
Author: Eric Liang <ekl@databricks.com>
Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekhliang@gmail.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes #15515 from cloud-fan/partition.
Diffstat (limited to 'sql/hive/src/main')
3 files changed, 90 insertions, 53 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 2003ff42d4..409c316c68 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils} import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types.{DataType, StructField, StructType} @@ -105,13 +106,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * metastore. */ private def verifyTableProperties(table: CatalogTable): Unit = { - val invalidKeys = table.properties.keys.filter { key => - key.startsWith(DATASOURCE_PREFIX) || key.startsWith(STATISTICS_PREFIX) - } + val invalidKeys = table.properties.keys.filter(_.startsWith(SPARK_SQL_PREFIX)) if (invalidKeys.nonEmpty) { throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " + - s"as table property keys may not start with '$DATASOURCE_PREFIX' or '$STATISTICS_PREFIX':" + - s" ${invalidKeys.mkString("[", ", ", "]")}") + s"as table property keys may not start with '$SPARK_SQL_PREFIX': " + + invalidKeys.mkString("[", ", ", "]")) } // External users are not allowed to set/switch the table type. In Hive metastore, the table // type can be switched by changing the value of a case-sensitive table property `EXTERNAL`. @@ -190,11 +189,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat throw new TableAlreadyExistsException(db = db, table = table) } // Before saving data source table metadata into Hive metastore, we should: - // 1. Put table schema, partition column names and bucket specification in table properties. + // 1. Put table provider, schema, partition column names, bucket specification and partition + // provider in table properties. // 2. Check if this table is hive compatible // 2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty // and save table metadata to Hive. - // 2.1 If it's hive compatible, set serde information in table metadata and try to save + // 2.2 If it's hive compatible, set serde information in table metadata and try to save // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 if (DDLUtils.isDatasourceTable(tableDefinition)) { // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. @@ -204,6 +204,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val tableProperties = new scala.collection.mutable.HashMap[String, String] tableProperties.put(DATASOURCE_PROVIDER, provider) + if (tableDefinition.partitionProviderIsHive) { + tableProperties.put(TABLE_PARTITION_PROVIDER, "hive") + } // Serialized JSON schema string may be too long to be stored into a single metastore table // property. In this case, we split the JSON string and store each part as a separate table @@ -241,12 +244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } - // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column - // names and bucket specification to empty. + // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and + // bucket specification to empty. Note that partition columns are retained, so that we can + // call partition-related Hive API later. def newSparkSQLSpecificMetastoreTable(): CatalogTable = { tableDefinition.copy( - schema = new StructType, - partitionColumnNames = Nil, + schema = tableDefinition.partitionSchema, bucketSpec = None, properties = tableDefinition.properties ++ tableProperties) } @@ -419,12 +422,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, // to retain the spark specific format if it is. Also add old data source properties to table // properties, to retain the data source table format. - val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX)) + val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX)) + val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) { + TABLE_PARTITION_PROVIDER -> "hive" + } else { + TABLE_PARTITION_PROVIDER -> "builtin" + } val newDef = withStatsProps.copy( schema = oldDef.schema, partitionColumnNames = oldDef.partitionColumnNames, bucketSpec = oldDef.bucketSpec, - properties = oldDataSourceProps ++ withStatsProps.properties) + properties = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp) client.alterTable(newDef) } else { @@ -448,7 +456,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * properties, and filter out these special entries from table properties. */ private def restoreTableMetadata(table: CatalogTable): CatalogTable = { - val catalogTable = if (table.tableType == VIEW || conf.get(DEBUG_MODE)) { + if (conf.get(DEBUG_MODE)) { + return table + } + + val tableWithSchema = if (table.tableType == VIEW) { table } else { getProviderFromTableProperties(table).map { provider => @@ -473,30 +485,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat provider = Some(provider), partitionColumnNames = getPartitionColumnsFromTableProperties(table), bucketSpec = getBucketSpecFromTableProperties(table), - properties = getOriginalTableProperties(table)) + partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive")) } getOrElse { - table.copy(provider = Some("hive")) + table.copy(provider = Some("hive"), partitionProviderIsHive = true) } } + // construct Spark's statistics from information in Hive metastore - val statsProps = catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) - if (statsProps.nonEmpty) { + val statsProps = tableWithSchema.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) + val tableWithStats = if (statsProps.nonEmpty) { val colStatsProps = statsProps.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)) .map { case (k, v) => (k.drop(STATISTICS_COL_STATS_PREFIX.length), v) } - val colStats: Map[String, ColumnStat] = catalogTable.schema.collect { + val colStats: Map[String, ColumnStat] = tableWithSchema.schema.collect { case f if colStatsProps.contains(f.name) => val numFields = ColumnStatStruct.numStatFields(f.dataType) (f.name, ColumnStat(numFields, colStatsProps(f.name))) }.toMap - catalogTable.copy( - properties = removeStatsProperties(catalogTable), + tableWithSchema.copy( stats = Some(Statistics( - sizeInBytes = BigInt(catalogTable.properties(STATISTICS_TOTAL_SIZE)), - rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)), + sizeInBytes = BigInt(tableWithSchema.properties(STATISTICS_TOTAL_SIZE)), + rowCount = tableWithSchema.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)), colStats = colStats))) } else { - catalogTable + tableWithSchema } + + tableWithStats.copy(properties = getOriginalTableProperties(table)) } override def tableExists(db: String, table: String): Boolean = withClient { @@ -581,13 +595,30 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Partitions // -------------------------------------------------------------------------- + // Hive metastore is not case preserving and the partition columns are always lower cased. We need + // to lower case the column names in partition specification before calling partition related Hive + // APIs, to match this behaviour. + private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = { + spec.map { case (k, v) => k.toLowerCase -> v } + } + + // Hive metastore is not case preserving and the column names of the partition specification we + // get from the metastore are always lower cased. We should restore them w.r.t. the actual table + // partition columns. + private def restorePartitionSpec( + spec: TablePartitionSpec, + partCols: Seq[String]): TablePartitionSpec = { + spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v } + } + override def createPartitions( db: String, table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = withClient { requireTableExists(db, table) - client.createPartitions(db, table, parts, ignoreIfExists) + val lowerCasedParts = parts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) + client.createPartitions(db, table, lowerCasedParts, ignoreIfExists) } override def dropPartitions( @@ -597,7 +628,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat ignoreIfNotExists: Boolean, purge: Boolean): Unit = withClient { requireTableExists(db, table) - client.dropPartitions(db, table, parts, ignoreIfNotExists, purge) + client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge) } override def renamePartitions( @@ -605,21 +636,24 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = withClient { - client.renamePartitions(db, table, specs, newSpecs) + client.renamePartitions( + db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec)) } override def alterPartitions( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withClient { - client.alterPartitions(db, table, newParts) + val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) + client.alterPartitions(db, table, lowerCasedParts) } override def getPartition( db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition = withClient { - client.getPartition(db, table, spec) + val part = client.getPartition(db, table, lowerCasePartitionSpec(spec)) + part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames)) } /** @@ -629,7 +663,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient { - client.getPartitionOption(db, table, spec) + client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part => + part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames)) + } } /** @@ -639,14 +675,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient { - client.getPartitions(db, table, partialSpec) + client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part => + part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames)) + } } override def listPartitionsByFilter( db: String, table: String, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { - val catalogTable = client.getTable(db, table) + val rawTable = client.getTable(db, table) + val catalogTable = restoreTableMetadata(rawTable) val partitionColumnNames = catalogTable.partitionColumnNames.toSet val nonPartitionPruningPredicates = predicates.filterNot { _.references.map(_.name).toSet.subsetOf(partitionColumnNames) @@ -660,19 +699,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val partitionSchema = catalogTable.partitionSchema if (predicates.nonEmpty) { - val clientPrunedPartitions = - client.getPartitionsByFilter(catalogTable, predicates) + val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part => + part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames)) + } val boundPredicate = InterpretedPredicate.create(predicates.reduce(And).transform { case att: AttributeReference => val index = partitionSchema.indexWhere(_.name == att.name) BoundReference(index, partitionSchema(index).dataType, nullable = true) }) - clientPrunedPartitions.filter { case p: CatalogTablePartition => - boundPredicate(p.toRow(partitionSchema)) - } + clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) } } else { - client.getPartitions(catalogTable) + client.getPartitions(catalogTable).map { part => + part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames)) + } } } @@ -722,7 +762,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } object HiveExternalCatalog { - val DATASOURCE_PREFIX = "spark.sql.sources." + val SPARK_SQL_PREFIX = "spark.sql." + + val DATASOURCE_PREFIX = SPARK_SQL_PREFIX + "sources." val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." @@ -736,21 +778,20 @@ object HiveExternalCatalog { val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." - val STATISTICS_PREFIX = "spark.sql.statistics." + val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics." val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize" val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows" val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats." - def removeStatsProperties(metadata: CatalogTable): Map[String, String] = { - metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) } - } + val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider" + def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = { metadata.properties.get(DATASOURCE_PROVIDER) } def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = { - metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) } + metadata.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) } } // A persisted data source table always store its schema in the catalog. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6c1585d5f5..d1de863ce3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -76,11 +76,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, className = table.provider.get, - options = table.storage.properties) + options = table.storage.properties, + catalogTable = Some(table)) - LogicalRelation( - dataSource.resolveRelation(), - catalogTable = Some(table)) + LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) } } @@ -194,7 +193,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. - val lazyPruningEnabled = sparkSession.sqlContext.conf.filesourcePartitionPruning + val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 8835b266b2..84873bbbb8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -777,7 +777,7 @@ private[hive] class HiveClientImpl( val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) } - if (table.schema.isEmpty) { + if (schema.isEmpty) { // This is a hack to preserve existing behavior. Before Spark 2.0, we do not // set a default serde here (this was done in Hive), and so if the user provides // an empty schema Hive would automatically populate the schema with a single @@ -831,9 +831,6 @@ private[hive] class HiveClientImpl( new HivePartition(ht, tpart) } - // TODO (cloud-fan): the column names in partition specification are always lower cased because - // Hive metastore is not case preserving. We should normalize them to the actual column names of - // the table, once we store partition spec of data source tables. private def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition CatalogTablePartition( |