aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-10-27 14:22:30 -0700
committerYin Huai <yhuai@databricks.com>2016-10-27 14:22:30 -0700
commitccb11543048dccd4cc590a8db1df1d9d5847d112 (patch)
treed015232ebf5b1232e8f5df4aebe4854c37cae2b2 /sql/hive/src/main
parent79fd0cc0584e48fb021c4237877b15abbffb319a (diff)
downloadspark-ccb11543048dccd4cc590a8db1df1d9d5847d112.tar.gz
spark-ccb11543048dccd4cc590a8db1df1d9d5847d112.tar.bz2
spark-ccb11543048dccd4cc590a8db1df1d9d5847d112.zip
[SPARK-17970][SQL] store partition spec in metastore for data source table
## What changes were proposed in this pull request? We should follow hive table and also store partition spec in metastore for data source table. This brings 2 benefits: 1. It's more flexible to manage the table data files, as users can use `ADD PARTITION`, `DROP PARTITION` and `RENAME PARTITION` 2. We don't need to cache all file status for data source table anymore. ## How was this patch tested? existing tests. Author: Eric Liang <ekl@databricks.com> Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekhliang@gmail.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #15515 from cloud-fan/partition.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala129
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala5
3 files changed, 90 insertions, 53 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2003ff42d4..409c316c68 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -105,13 +106,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* metastore.
*/
private def verifyTableProperties(table: CatalogTable): Unit = {
- val invalidKeys = table.properties.keys.filter { key =>
- key.startsWith(DATASOURCE_PREFIX) || key.startsWith(STATISTICS_PREFIX)
- }
+ val invalidKeys = table.properties.keys.filter(_.startsWith(SPARK_SQL_PREFIX))
if (invalidKeys.nonEmpty) {
throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
- s"as table property keys may not start with '$DATASOURCE_PREFIX' or '$STATISTICS_PREFIX':" +
- s" ${invalidKeys.mkString("[", ", ", "]")}")
+ s"as table property keys may not start with '$SPARK_SQL_PREFIX': " +
+ invalidKeys.mkString("[", ", ", "]"))
}
// External users are not allowed to set/switch the table type. In Hive metastore, the table
// type can be switched by changing the value of a case-sensitive table property `EXTERNAL`.
@@ -190,11 +189,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
throw new TableAlreadyExistsException(db = db, table = table)
}
// Before saving data source table metadata into Hive metastore, we should:
- // 1. Put table schema, partition column names and bucket specification in table properties.
+ // 1. Put table provider, schema, partition column names, bucket specification and partition
+ // provider in table properties.
// 2. Check if this table is hive compatible
// 2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty
// and save table metadata to Hive.
- // 2.1 If it's hive compatible, set serde information in table metadata and try to save
+ // 2.2 If it's hive compatible, set serde information in table metadata and try to save
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
if (DDLUtils.isDatasourceTable(tableDefinition)) {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
@@ -204,6 +204,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val tableProperties = new scala.collection.mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)
+ if (tableDefinition.partitionProviderIsHive) {
+ tableProperties.put(TABLE_PARTITION_PROVIDER, "hive")
+ }
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
@@ -241,12 +244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}
- // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column
- // names and bucket specification to empty.
+ // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
+ // bucket specification to empty. Note that partition columns are retained, so that we can
+ // call partition-related Hive API later.
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
tableDefinition.copy(
- schema = new StructType,
- partitionColumnNames = Nil,
+ schema = tableDefinition.partitionSchema,
bucketSpec = None,
properties = tableDefinition.properties ++ tableProperties)
}
@@ -419,12 +422,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
// to retain the spark specific format if it is. Also add old data source properties to table
// properties, to retain the data source table format.
- val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
+ val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+ val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) {
+ TABLE_PARTITION_PROVIDER -> "hive"
+ } else {
+ TABLE_PARTITION_PROVIDER -> "builtin"
+ }
val newDef = withStatsProps.copy(
schema = oldDef.schema,
partitionColumnNames = oldDef.partitionColumnNames,
bucketSpec = oldDef.bucketSpec,
- properties = oldDataSourceProps ++ withStatsProps.properties)
+ properties = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp)
client.alterTable(newDef)
} else {
@@ -448,7 +456,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* properties, and filter out these special entries from table properties.
*/
private def restoreTableMetadata(table: CatalogTable): CatalogTable = {
- val catalogTable = if (table.tableType == VIEW || conf.get(DEBUG_MODE)) {
+ if (conf.get(DEBUG_MODE)) {
+ return table
+ }
+
+ val tableWithSchema = if (table.tableType == VIEW) {
table
} else {
getProviderFromTableProperties(table).map { provider =>
@@ -473,30 +485,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
provider = Some(provider),
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
bucketSpec = getBucketSpecFromTableProperties(table),
- properties = getOriginalTableProperties(table))
+ partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive"))
} getOrElse {
- table.copy(provider = Some("hive"))
+ table.copy(provider = Some("hive"), partitionProviderIsHive = true)
}
}
+
// construct Spark's statistics from information in Hive metastore
- val statsProps = catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
- if (statsProps.nonEmpty) {
+ val statsProps = tableWithSchema.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+ val tableWithStats = if (statsProps.nonEmpty) {
val colStatsProps = statsProps.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX))
.map { case (k, v) => (k.drop(STATISTICS_COL_STATS_PREFIX.length), v) }
- val colStats: Map[String, ColumnStat] = catalogTable.schema.collect {
+ val colStats: Map[String, ColumnStat] = tableWithSchema.schema.collect {
case f if colStatsProps.contains(f.name) =>
val numFields = ColumnStatStruct.numStatFields(f.dataType)
(f.name, ColumnStat(numFields, colStatsProps(f.name)))
}.toMap
- catalogTable.copy(
- properties = removeStatsProperties(catalogTable),
+ tableWithSchema.copy(
stats = Some(Statistics(
- sizeInBytes = BigInt(catalogTable.properties(STATISTICS_TOTAL_SIZE)),
- rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+ sizeInBytes = BigInt(tableWithSchema.properties(STATISTICS_TOTAL_SIZE)),
+ rowCount = tableWithSchema.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
colStats = colStats)))
} else {
- catalogTable
+ tableWithSchema
}
+
+ tableWithStats.copy(properties = getOriginalTableProperties(table))
}
override def tableExists(db: String, table: String): Boolean = withClient {
@@ -581,13 +595,30 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Partitions
// --------------------------------------------------------------------------
+ // Hive metastore is not case preserving and the partition columns are always lower cased. We need
+ // to lower case the column names in partition specification before calling partition related Hive
+ // APIs, to match this behaviour.
+ private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
+ spec.map { case (k, v) => k.toLowerCase -> v }
+ }
+
+ // Hive metastore is not case preserving and the column names of the partition specification we
+ // get from the metastore are always lower cased. We should restore them w.r.t. the actual table
+ // partition columns.
+ private def restorePartitionSpec(
+ spec: TablePartitionSpec,
+ partCols: Seq[String]): TablePartitionSpec = {
+ spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
+ }
+
override def createPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.createPartitions(db, table, parts, ignoreIfExists)
+ val lowerCasedParts = parts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+ client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
}
override def dropPartitions(
@@ -597,7 +628,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.dropPartitions(db, table, parts, ignoreIfNotExists, purge)
+ client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
}
override def renamePartitions(
@@ -605,21 +636,24 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
- client.renamePartitions(db, table, specs, newSpecs)
+ client.renamePartitions(
+ db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
}
override def alterPartitions(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withClient {
- client.alterPartitions(db, table, newParts)
+ val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+ client.alterPartitions(db, table, lowerCasedParts)
}
override def getPartition(
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = withClient {
- client.getPartition(db, table, spec)
+ val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+ part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
}
/**
@@ -629,7 +663,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
- client.getPartitionOption(db, table, spec)
+ client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+ }
}
/**
@@ -639,14 +675,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
- client.getPartitions(db, table, partialSpec)
+ client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+ }
}
override def listPartitionsByFilter(
db: String,
table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient {
- val catalogTable = client.getTable(db, table)
+ val rawTable = client.getTable(db, table)
+ val catalogTable = restoreTableMetadata(rawTable)
val partitionColumnNames = catalogTable.partitionColumnNames.toSet
val nonPartitionPruningPredicates = predicates.filterNot {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
@@ -660,19 +699,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val partitionSchema = catalogTable.partitionSchema
if (predicates.nonEmpty) {
- val clientPrunedPartitions =
- client.getPartitionsByFilter(catalogTable, predicates)
+ val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+ }
val boundPredicate =
InterpretedPredicate.create(predicates.reduce(And).transform {
case att: AttributeReference =>
val index = partitionSchema.indexWhere(_.name == att.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
- clientPrunedPartitions.filter { case p: CatalogTablePartition =>
- boundPredicate(p.toRow(partitionSchema))
- }
+ clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
} else {
- client.getPartitions(catalogTable)
+ client.getPartitions(catalogTable).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+ }
}
}
@@ -722,7 +762,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
object HiveExternalCatalog {
- val DATASOURCE_PREFIX = "spark.sql.sources."
+ val SPARK_SQL_PREFIX = "spark.sql."
+
+ val DATASOURCE_PREFIX = SPARK_SQL_PREFIX + "sources."
val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
@@ -736,21 +778,20 @@ object HiveExternalCatalog {
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
- val STATISTICS_PREFIX = "spark.sql.statistics."
+ val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics."
val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
- def removeStatsProperties(metadata: CatalogTable): Map[String, String] = {
- metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) }
- }
+ val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider"
+
def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {
metadata.properties.get(DATASOURCE_PROVIDER)
}
def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = {
- metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) }
+ metadata.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }
}
// A persisted data source table always store its schema in the catalog.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6c1585d5f5..d1de863ce3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -76,11 +76,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
- options = table.storage.properties)
+ options = table.storage.properties,
+ catalogTable = Some(table))
- LogicalRelation(
- dataSource.resolveRelation(),
- catalogTable = Some(table))
+ LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
}
}
@@ -194,7 +193,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
- val lazyPruningEnabled = sparkSession.sqlContext.conf.filesourcePartitionPruning
+ val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 8835b266b2..84873bbbb8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -777,7 +777,7 @@ private[hive] class HiveClientImpl(
val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
table.partitionColumnNames.contains(c.getName)
}
- if (table.schema.isEmpty) {
+ if (schema.isEmpty) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
@@ -831,9 +831,6 @@ private[hive] class HiveClientImpl(
new HivePartition(ht, tpart)
}
- // TODO (cloud-fan): the column names in partition specification are always lower cased because
- // Hive metastore is not case preserving. We should normalize them to the actual column names of
- // the table, once we store partition spec of data source tables.
private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
val apiPartition = hp.getTPartition
CatalogTablePartition(