aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-10-27 14:22:30 -0700
committerYin Huai <yhuai@databricks.com>2016-10-27 14:22:30 -0700
commitccb11543048dccd4cc590a8db1df1d9d5847d112 (patch)
treed015232ebf5b1232e8f5df4aebe4854c37cae2b2
parent79fd0cc0584e48fb021c4237877b15abbffb319a (diff)
downloadspark-ccb11543048dccd4cc590a8db1df1d9d5847d112.tar.gz
spark-ccb11543048dccd4cc590a8db1df1d9d5847d112.tar.bz2
spark-ccb11543048dccd4cc590a8db1df1d9d5847d112.zip
[SPARK-17970][SQL] store partition spec in metastore for data source table
## What changes were proposed in this pull request? We should follow hive table and also store partition spec in metastore for data source table. This brings 2 benefits: 1. It's more flexible to manage the table data files, as users can use `ADD PARTITION`, `DROP PARTITION` and `RENAME PARTITION` 2. We don't need to cache all file status for data source table anymore. ## How was this patch tested? existing tests. Author: Eric Liang <ekl@databricks.com> Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekhliang@gmail.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #15515 from cloud-fan/partition.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala12
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala90
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala200
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala129
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala137
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala)112
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala65
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala8
26 files changed, 596 insertions, 329 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a97ed701c4..7c3bec8979 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -89,9 +89,10 @@ case class CatalogTablePartition(
parameters: Map[String, String] = Map.empty) {
override def toString: String = {
+ val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
val output =
Seq(
- s"Partition Values: [${spec.values.mkString(", ")}]",
+ s"Partition Values: [$specString]",
s"$storage",
s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
@@ -137,6 +138,8 @@ case class BucketSpec(
* Can be None if this table is a View, should be "hive" for hive serde tables.
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
* underlying table but not supported by Spark SQL yet.
+ * @param partitionProviderIsHive whether this table's partition metadata is stored in the Hive
+ * metastore.
*/
case class CatalogTable(
identifier: TableIdentifier,
@@ -154,7 +157,8 @@ case class CatalogTable(
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
- unsupportedFeatures: Seq[String] = Seq.empty) {
+ unsupportedFeatures: Seq[String] = Seq.empty,
+ partitionProviderIsHive: Boolean = false) {
/** schema of this table's partition columns */
def partitionSchema: StructType = StructType(schema.filter {
@@ -212,11 +216,11 @@ case class CatalogTable(
comment.map("Comment: " + _).getOrElse(""),
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "",
- s"$storage")
+ s"$storage",
+ if (partitionProviderIsHive) "Partition Provider: Hive" else "")
output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
}
-
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index cb0426c7a9..3eff12f9ee 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -489,6 +489,7 @@ class TreeNodeSuite extends SparkFunSuite {
"owner" -> "",
"createTime" -> 0,
"lastAccessTime" -> -1,
+ "partitionProviderIsHive" -> false,
"properties" -> JNull,
"unsupportedFeatures" -> List.empty[String]))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 4b5f0246b9..7ff3522f54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,8 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
+import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.types.StructType
@@ -387,7 +388,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec
)
- val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
+ val createCmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
+ val cmd = if (tableDesc.partitionColumnNames.nonEmpty &&
+ df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
+ // Need to recover partitions into the metastore so our saved data is visible.
+ val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier)
+ Union(createCmd, recoverPartitionCmd)
+ } else {
+ createCmd
+ }
df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 488138709a..f873f34a84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -50,7 +50,8 @@ case class AnalyzeColumnCommand(
AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
- updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
+ updateStats(logicalRel.catalogTable.get,
+ AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
case otherRelation =>
throw new AnalysisException("ANALYZE TABLE is not supported for " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 7b0e49b665..52a8fc88c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -51,7 +51,8 @@ case class AnalyzeTableCommand(
// data source tables have been converted into LogicalRelations
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
- updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
+ updateTableStats(logicalRel.catalogTable.get,
+ AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
case otherRelation =>
throw new AnalysisException("ANALYZE TABLE is not supported for " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index a8c75a7f29..2a9743130d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -94,10 +94,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
val newTable = table.copy(
storage = table.storage.copy(properties = optionsWithPath),
schema = dataSource.schema,
- partitionColumnNames = partitionColumnNames)
+ partitionColumnNames = partitionColumnNames,
+ // If metastore partition management for file source tables is enabled, we start off with
+ // partition provider hive, but no partitions in the metastore. The user has to call
+ // `msck repair table` to populate the table partitions.
+ partitionProviderIsHive = partitionColumnNames.nonEmpty &&
+ sparkSession.sessionState.conf.manageFilesourcePartitions)
// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
+
Seq.empty[Row]
}
}
@@ -232,6 +238,15 @@ case class CreateDataSourceTableAsSelectCommand(
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
}
+ result match {
+ case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+ sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+ // Need to recover partitions into the metastore so our saved data is visible.
+ sparkSession.sessionState.executePlan(
+ AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+ case _ =>
+ }
+
// Refresh the cache of the table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 15656faa08..61e0550cef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -28,10 +28,11 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -346,10 +347,7 @@ case class AlterTableAddPartitionCommand(
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- "ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
- }
+ DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION")
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
spec,
@@ -382,11 +380,8 @@ case class AlterTableRenamePartitionCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- "ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API")
- }
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
+ DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION")
val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec(
oldPartition,
@@ -432,10 +427,7 @@ case class AlterTableDropPartitionCommand(
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- "ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
- }
+ DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
val normalizedSpecs = specs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
@@ -493,33 +485,39 @@ case class AlterTableRecoverPartitionsCommand(
}
}
+ private def getBasePath(table: CatalogTable): Option[String] = {
+ if (table.provider == Some("hive")) {
+ table.storage.locationUri
+ } else {
+ new CaseInsensitiveMap(table.storage.properties).get("path")
+ }
+ }
+
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val tableIdentWithDB = table.identifier.quotedString
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB")
- }
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
}
- if (table.storage.locationUri.isEmpty) {
+
+ val tablePath = getBasePath(table)
+ if (tablePath.isEmpty) {
throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
s"location provided: $tableIdentWithDB")
}
- val root = new Path(table.storage.locationUri.get)
+ val root = new Path(tablePath.get)
logInfo(s"Recover all the partitions in $root")
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val hadoopConf = spark.sparkContext.hadoopConfiguration
val pathFilter = getPathFilter(hadoopConf)
- val partitionSpecsAndLocs = scanPartitions(
- spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
+ val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
+ table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")
@@ -531,6 +529,11 @@ case class AlterTableRecoverPartitionsCommand(
logInfo(s"Finished to gather the fast stats for all $total partitions.")
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
+ // Updates the table to indicate that its partition metadata is stored in the Hive metastore.
+ // This is always the case for Hive format tables, but is not true for Datasource tables created
+ // before Spark 2.1 unless they are converted via `msck repair table`.
+ spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
+ catalog.refreshTable(tableName)
logInfo(s"Recovered all partitions ($total).")
Seq.empty[Row]
}
@@ -544,7 +547,8 @@ case class AlterTableRecoverPartitionsCommand(
path: Path,
spec: TablePartitionSpec,
partitionNames: Seq[String],
- threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
+ threshold: Int,
+ resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}
@@ -563,15 +567,15 @@ case class AlterTableRecoverPartitionsCommand(
val name = st.getPath.getName
if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
- val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+ val columnName = PartitioningUtils.unescapePathName(ps(0))
// TODO: Validate the value
val value = PartitioningUtils.unescapePathName(ps(1))
- // comparing with case-insensitive, but preserve the case
- if (columnName == partitionNames.head) {
- scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
- partitionNames.drop(1), threshold)
+ if (resolver(columnName, partitionNames.head)) {
+ scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
+ partitionNames.drop(1), threshold, resolver)
} else {
- logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
+ logWarning(
+ s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
Seq()
}
} else {
@@ -676,16 +680,11 @@ case class AlterTableSetLocationCommand(
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
partitionSpec match {
case Some(spec) =>
+ DDLUtils.verifyPartitionProviderIsHive(
+ sparkSession, table, "ALTER TABLE ... SET LOCATION")
// Partition spec is specified, so we set the location only for this partition
val part = catalog.getPartition(table.identifier, spec)
- val newPart =
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- "ALTER TABLE SET LOCATION for partition is not allowed for tables defined " +
- "using the datasource API")
- } else {
- part.copy(storage = part.storage.copy(locationUri = Some(location)))
- }
+ val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
@@ -710,6 +709,25 @@ object DDLUtils {
}
/**
+ * Throws a standard error for actions that require partitionProvider = hive.
+ */
+ def verifyPartitionProviderIsHive(
+ spark: SparkSession, table: CatalogTable, action: String): Unit = {
+ val tableName = table.identifier.table
+ if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
+ throw new AnalysisException(
+ s"$action is not allowed on $tableName since filesource partition management is " +
+ "disabled (spark.sql.hive.manageFilesourcePartitions = false).")
+ }
+ if (!table.partitionProviderIsHive && isDatasourceTable(table)) {
+ throw new AnalysisException(
+ s"$action is not allowed on $tableName since its partition metadata is not stored in " +
+ "the Hive metastore. To import this information into the metastore, run " +
+ s"`msck repair table $tableName`")
+ }
+ }
+
+ /**
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
* issue an exception [[AnalysisException]].
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index aec25430b7..4acfffb628 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -358,19 +358,16 @@ case class TruncateTableCommand(
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB")
}
- val isDatasourceTable = DDLUtils.isDatasourceTable(table)
- if (isDatasourceTable && partitionSpec.isDefined) {
- throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables created using the data sources API: $tableIdentwithDB")
- }
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
s"for tables that are not partitioned: $tableIdentwithDB")
}
+ if (partitionSpec.isDefined) {
+ DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
+ }
val locations =
- if (isDatasourceTable) {
+ if (DDLUtils.isDatasourceTable(table)) {
Seq(table.storage.properties.get("path"))
} else if (table.partitionColumnNames.isEmpty) {
Seq(table.storage.locationUri)
@@ -453,7 +450,7 @@ case class DescribeTableCommand(
describeFormattedTableInfo(metadata, result)
}
} else {
- describeDetailedPartitionInfo(catalog, metadata, result)
+ describeDetailedPartitionInfo(sparkSession, catalog, metadata, result)
}
}
@@ -492,6 +489,10 @@ case class DescribeTableCommand(
describeStorageInfo(table, buffer)
if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer)
+
+ if (DDLUtils.isDatasourceTable(table) && table.partitionProviderIsHive) {
+ append(buffer, "Partition Provider:", "Hive", "")
+ }
}
private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
@@ -528,6 +529,7 @@ case class DescribeTableCommand(
}
private def describeDetailedPartitionInfo(
+ spark: SparkSession,
catalog: SessionCatalog,
metadata: CatalogTable,
result: ArrayBuffer[Row]): Unit = {
@@ -535,10 +537,7 @@ case class DescribeTableCommand(
throw new AnalysisException(
s"DESC PARTITION is not allowed on a view: ${table.identifier}")
}
- if (DDLUtils.isDatasourceTable(metadata)) {
- throw new AnalysisException(
- s"DESC PARTITION is not allowed on a datasource table: ${table.identifier}")
- }
+ DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION")
val partition = catalog.getPartition(table, partitionSpec)
if (isExtended) {
describeExtendedDetailedPartitionInfo(table, metadata, partition, result)
@@ -743,10 +742,7 @@ case class ShowPartitionsCommand(
s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
}
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a datasource table: $tableIdentWithDB")
- }
+ DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "SHOW PARTITIONS")
/**
* Validate the partitioning spec by making sure all the referenced columns are
@@ -894,18 +890,11 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
private def showHiveTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
if (metadata.properties.nonEmpty) {
- val filteredProps = metadata.properties.filterNot {
- // Skips "EXTERNAL" property for external tables
- case (key, _) => key == "EXTERNAL" && metadata.tableType == EXTERNAL
- }
-
- val props = filteredProps.map { case (key, value) =>
+ val props = metadata.properties.map { case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}
- if (props.nonEmpty) {
- builder ++= props.mkString("TBLPROPERTIES (\n ", ",\n ", "\n)\n")
- }
+ builder ++= props.mkString("TBLPROPERTIES (\n ", ",\n ", "\n)\n")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 17da606580..5b8f05a396 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
@@ -65,6 +65,8 @@ import org.apache.spark.util.Utils
* @param partitionColumns A list of column names that the relation is partitioned by. When this
* list is empty, the relation is unpartitioned.
* @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
+ * @param catalogTable Optional catalog table reference that can be used to push down operations
+ * over the datasource to the catalog service.
*/
case class DataSource(
sparkSession: SparkSession,
@@ -73,7 +75,8 @@ case class DataSource(
userSpecifiedSchema: Option[StructType] = None,
partitionColumns: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
- options: Map[String, String] = Map.empty) extends Logging {
+ options: Map[String, String] = Map.empty,
+ catalogTable: Option[CatalogTable] = None) extends Logging {
case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
@@ -412,9 +415,16 @@ case class DataSource(
})
}
- val fileCatalog =
+ val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
+ catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
+ new TableFileCatalog(
+ sparkSession,
+ catalogTable.get,
+ catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
+ } else {
new ListingFileCatalog(
sparkSession, globbedPaths, options, partitionSchema)
+ }
val dataSchema = userSpecifiedSchema.map { schema =>
val equality = sparkSession.sessionState.conf.resolver
@@ -423,7 +433,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
- fileCatalog.allFiles())
+ fileCatalog.asInstanceOf[ListingFileCatalog].allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
@@ -432,7 +442,7 @@ case class DataSource(
HadoopFsRelation(
fileCatalog,
- partitionSchema = fileCatalog.partitionSpec().partitionColumns,
+ partitionSchema = fileCatalog.partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7d0abe86a4..f0bcf94ead 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -30,11 +30,11 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils, ExecutedCommandExec}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -179,7 +179,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
"Cannot overwrite a path that is also being read from.")
}
- InsertIntoHadoopFsRelationCommand(
+ val insertCmd = InsertIntoHadoopFsRelationCommand(
outputPath,
query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
t.bucketSpec,
@@ -188,6 +188,15 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
t.options,
query,
mode)
+
+ if (l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty &&
+ l.catalogTable.get.partitionProviderIsHive) {
+ // TODO(ekl) we should be more efficient here and only recover the newly added partitions
+ val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(l.catalogTable.get.identifier)
+ Union(insertCmd, recoverPartitionCmd)
+ } else {
+ insertCmd
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
index 2bc66ceeeb..dba64624c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
/**
* A collection of data files from a partitioned relation, along with the partition values in the
@@ -63,4 +64,7 @@ trait FileCatalog {
/** Sum of table file sizes, in bytes */
def sizeInBytes: Long
+
+ /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
+ def partitionSchema: StructType
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index e0ec748a0b..7c2e6fd04d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -64,7 +64,7 @@ object FileStatusCache {
*/
def newCache(session: SparkSession): FileStatusCache = {
synchronized {
- if (session.sqlContext.conf.filesourcePartitionPruning &&
+ if (session.sqlContext.conf.manageFilesourcePartitions &&
session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
if (sharedCache == null) {
sharedCache = new SharedInMemoryCache(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 9b1903c471..cc4049e925 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -38,19 +38,21 @@ import org.apache.spark.util.SerializableConfiguration
* It provides the necessary methods to parse partition data based on a set of files.
*
* @param parameters as set of options to control partition discovery
- * @param partitionSchema an optional partition schema that will be use to provide types for the
- * discovered partitions
-*/
+ * @param userPartitionSchema an optional partition schema that will be use to provide types for
+ * the discovered partitions
+ */
abstract class PartitioningAwareFileCatalog(
sparkSession: SparkSession,
parameters: Map[String, String],
- partitionSchema: Option[StructType],
+ userPartitionSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
import PartitioningAwareFileCatalog.BASE_PATH_PARAM
/** Returns the specification of the partitions inferred from the data. */
def partitionSpec(): PartitionSpec
+ override def partitionSchema: StructType = partitionSpec().partitionColumns
+
protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
@@ -122,7 +124,7 @@ abstract class PartitioningAwareFileCatalog(
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
files.exists(f => isDataPath(f.getPath))
}.keys.toSeq
- partitionSchema match {
+ userPartitionSchema match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
leafDirs,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 667379b222..b459df5734 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
/**
@@ -45,6 +46,8 @@ class TableFileCatalog(
private val baseLocation = table.storage.locationUri
+ override def partitionSchema: StructType = table.partitionSchema
+
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
@@ -63,7 +66,6 @@ class TableFileCatalog(
if (table.partitionColumnNames.nonEmpty) {
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
- val partitionSchema = table.partitionSchema
val partitions = selectedPartitions.map { p =>
PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f47ec7f396..dc31f3bc32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -272,18 +272,20 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val HIVE_FILESOURCE_PARTITION_PRUNING =
- SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning")
- .doc("When true, enable metastore partition pruning for filesource relations as well. " +
- "This is currently implemented for converted Hive tables only.")
+ val HIVE_MANAGE_FILESOURCE_PARTITIONS =
+ SQLConfigBuilder("spark.sql.hive.manageFilesourcePartitions")
+ .doc("When true, enable metastore partition management for file source tables as well. " +
+ "This includes both datasource and converted Hive tables. When partition managment " +
+ "is enabled, datasource tables store partition in the Hive metastore, and use the " +
+ "metastore to prune partitions during query planning.")
.booleanConf
.createWithDefault(true)
val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize")
- .doc("When nonzero, enable caching of partition file metadata in memory. All table share " +
+ .doc("When nonzero, enable caching of partition file metadata in memory. All tables share " +
"a cache that can use up to specified num bytes for file metadata. This conf only " +
- "applies if filesource partition pruning is also enabled.")
+ "has an effect when hive filesource partition management is enabled.")
.longConf
.createWithDefault(250 * 1024 * 1024)
@@ -679,7 +681,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
- def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING)
+ def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)
def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 6857dd3728..2d73d9f1fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -197,7 +197,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
assertResult(expected.schema, s"Schema did not match for query #$i\n${expected.sql}") {
output.schema
}
- assertResult(expected.output, s"Result dit not match for query #$i\n${expected.sql}") {
+ assertResult(expected.output, s"Result did not match for query #$i\n${expected.sql}") {
output.output
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b989d01ec7..9fb0f5384d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -95,7 +95,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
.add("b", "int"),
provider = Some("hive"),
partitionColumnNames = Seq("a", "b"),
- createTime = 0L)
+ createTime = 0L,
+ partitionProviderIsHive = true)
}
private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
@@ -923,68 +924,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("alter table: rename partition") {
- val catalog = spark.sessionState.catalog
- val tableIdent = TableIdentifier("tab1", Some("dbx"))
- createPartitionedTable(tableIdent, isDatasourceTable = false)
-
- // basic rename partition
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
- sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
-
- // rename without explicitly specifying database
- catalog.setCurrentDatabase("dbx")
- sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
-
- // table to alter does not exist
- intercept[NoSuchTableException] {
- sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
- }
-
- // partition to rename does not exist
- intercept[NoSuchPartitionException] {
- sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
- }
-
- // partition spec in RENAME PARTITION should be case insensitive by default
- sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+ testRenamePartitions(isDatasourceTable = false)
}
test("alter table: rename partition (datasource table)") {
- createPartitionedTable(TableIdentifier("tab1", Some("dbx")), isDatasourceTable = true)
- val e = intercept[AnalysisException] {
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
- }.getMessage
- assert(e.contains(
- "ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API"))
- // table to alter does not exist
- intercept[NoSuchTableException] {
- sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
- }
- }
-
- private def createPartitionedTable(
- tableIdent: TableIdentifier,
- isDatasourceTable: Boolean): Unit = {
- val catalog = spark.sessionState.catalog
- val part1 = Map("a" -> "1", "b" -> "q")
- val part2 = Map("a" -> "2", "b" -> "c")
- val part3 = Map("a" -> "3", "b" -> "p")
- createDatabase(catalog, "dbx")
- createTable(catalog, tableIdent)
- createTablePartition(catalog, part1, tableIdent)
- createTablePartition(catalog, part2, tableIdent)
- createTablePartition(catalog, part3, tableIdent)
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(part1, part2, part3))
- if (isDatasourceTable) {
- convertToDatasourceTable(catalog, tableIdent)
- }
+ testRenamePartitions(isDatasourceTable = true)
}
test("show tables") {
@@ -1199,7 +1143,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
if (spec.isDefined) {
assert(storageFormat.properties.isEmpty)
- assert(storageFormat.locationUri.isEmpty)
+ assert(storageFormat.locationUri === Some(expected))
} else {
assert(storageFormat.properties.get("path") === Some(expected))
assert(storageFormat.locationUri === Some(expected))
@@ -1212,18 +1156,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
verifyLocation("/path/to/your/lovely/heart")
// set table partition location
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
- }
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
verifyLocation("/path/to/part/ways", Some(partSpec))
// set table location without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
verifyLocation("/swanky/steak/place")
// set table partition location without explicitly specifying database
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
- }
+ sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
verifyLocation("vienna", Some(partSpec))
// table to alter does not exist
intercept[AnalysisException] {
@@ -1354,26 +1294,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
// basic add partition
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
- "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
- assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
- assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
- assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
- }
+ sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
+ "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+ assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
+ assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
+ assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
// add partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(part1, part2, part3, part4))
- }
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4))
// table to alter does not exist
intercept[AnalysisException] {
@@ -1386,22 +1318,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// partition to add already exists when using IF NOT EXISTS
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(part1, part2, part3, part4))
- }
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4))
// partition spec in ADD PARTITION should be case insensitive by default
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(part1, part2, part3, part4, part5))
- }
+ sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4, part5))
}
private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
@@ -1424,21 +1348,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// basic drop partition
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
- }
+ sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
// drop partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
- }
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
// table to alter does not exist
intercept[AnalysisException] {
@@ -1451,20 +1367,56 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// partition to drop does not exist when using IF EXISTS
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
- }
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
// partition spec in DROP PARTITION should be case insensitive by default
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+ sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+ assert(catalog.listPartitions(tableIdent).isEmpty)
+ }
+
+ private def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
+ val catalog = spark.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1", Some("dbx"))
+ val part1 = Map("a" -> "1", "b" -> "q")
+ val part2 = Map("a" -> "2", "b" -> "c")
+ val part3 = Map("a" -> "3", "b" -> "p")
+ createDatabase(catalog, "dbx")
+ createTable(catalog, tableIdent)
+ createTablePartition(catalog, part1, tableIdent)
+ createTablePartition(catalog, part2, tableIdent)
+ createTablePartition(catalog, part3, tableIdent)
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+ if (isDatasourceTable) {
+ convertToDatasourceTable(catalog, tableIdent)
+ }
+
+ // basic rename partition
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
+ // rename without explicitly specifying database
+ catalog.setCurrentDatabase("dbx")
+ sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
+ // table to alter does not exist
+ intercept[NoSuchTableException] {
+ sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
}
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).isEmpty)
+
+ // partition to rename does not exist
+ intercept[NoSuchPartitionException] {
+ sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
}
+
+ // partition spec in RENAME PARTITION should be case insensitive by default
+ sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
}
test("drop build-in function") {
@@ -1683,12 +1635,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- // truncating partitioned data source tables is not supported
withTable("rectangles", "rectangles2") {
data.write.saveAsTable("rectangles")
data.write.partitionBy("length").saveAsTable("rectangles2")
+
+ // not supported since the table is not partitioned
assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
- assertUnsupported("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+
+ // supported since partitions are stored in the metastore
+ sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+ assert(spark.table("rectangles2").collect().isEmpty)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2003ff42d4..409c316c68 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -105,13 +106,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* metastore.
*/
private def verifyTableProperties(table: CatalogTable): Unit = {
- val invalidKeys = table.properties.keys.filter { key =>
- key.startsWith(DATASOURCE_PREFIX) || key.startsWith(STATISTICS_PREFIX)
- }
+ val invalidKeys = table.properties.keys.filter(_.startsWith(SPARK_SQL_PREFIX))
if (invalidKeys.nonEmpty) {
throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
- s"as table property keys may not start with '$DATASOURCE_PREFIX' or '$STATISTICS_PREFIX':" +
- s" ${invalidKeys.mkString("[", ", ", "]")}")
+ s"as table property keys may not start with '$SPARK_SQL_PREFIX': " +
+ invalidKeys.mkString("[", ", ", "]"))
}
// External users are not allowed to set/switch the table type. In Hive metastore, the table
// type can be switched by changing the value of a case-sensitive table property `EXTERNAL`.
@@ -190,11 +189,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
throw new TableAlreadyExistsException(db = db, table = table)
}
// Before saving data source table metadata into Hive metastore, we should:
- // 1. Put table schema, partition column names and bucket specification in table properties.
+ // 1. Put table provider, schema, partition column names, bucket specification and partition
+ // provider in table properties.
// 2. Check if this table is hive compatible
// 2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty
// and save table metadata to Hive.
- // 2.1 If it's hive compatible, set serde information in table metadata and try to save
+ // 2.2 If it's hive compatible, set serde information in table metadata and try to save
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
if (DDLUtils.isDatasourceTable(tableDefinition)) {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
@@ -204,6 +204,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val tableProperties = new scala.collection.mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)
+ if (tableDefinition.partitionProviderIsHive) {
+ tableProperties.put(TABLE_PARTITION_PROVIDER, "hive")
+ }
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
@@ -241,12 +244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}
- // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column
- // names and bucket specification to empty.
+ // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
+ // bucket specification to empty. Note that partition columns are retained, so that we can
+ // call partition-related Hive API later.
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
tableDefinition.copy(
- schema = new StructType,
- partitionColumnNames = Nil,
+ schema = tableDefinition.partitionSchema,
bucketSpec = None,
properties = tableDefinition.properties ++ tableProperties)
}
@@ -419,12 +422,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
// to retain the spark specific format if it is. Also add old data source properties to table
// properties, to retain the data source table format.
- val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
+ val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+ val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) {
+ TABLE_PARTITION_PROVIDER -> "hive"
+ } else {
+ TABLE_PARTITION_PROVIDER -> "builtin"
+ }
val newDef = withStatsProps.copy(
schema = oldDef.schema,
partitionColumnNames = oldDef.partitionColumnNames,
bucketSpec = oldDef.bucketSpec,
- properties = oldDataSourceProps ++ withStatsProps.properties)
+ properties = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp)
client.alterTable(newDef)
} else {
@@ -448,7 +456,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* properties, and filter out these special entries from table properties.
*/
private def restoreTableMetadata(table: CatalogTable): CatalogTable = {
- val catalogTable = if (table.tableType == VIEW || conf.get(DEBUG_MODE)) {
+ if (conf.get(DEBUG_MODE)) {
+ return table
+ }
+
+ val tableWithSchema = if (table.tableType == VIEW) {
table
} else {
getProviderFromTableProperties(table).map { provider =>
@@ -473,30 +485,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
provider = Some(provider),
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
bucketSpec = getBucketSpecFromTableProperties(table),
- properties = getOriginalTableProperties(table))
+ partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive"))
} getOrElse {
- table.copy(provider = Some("hive"))
+ table.copy(provider = Some("hive"), partitionProviderIsHive = true)
}
}
+
// construct Spark's statistics from information in Hive metastore
- val statsProps = catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
- if (statsProps.nonEmpty) {
+ val statsProps = tableWithSchema.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+ val tableWithStats = if (statsProps.nonEmpty) {
val colStatsProps = statsProps.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX))
.map { case (k, v) => (k.drop(STATISTICS_COL_STATS_PREFIX.length), v) }
- val colStats: Map[String, ColumnStat] = catalogTable.schema.collect {
+ val colStats: Map[String, ColumnStat] = tableWithSchema.schema.collect {
case f if colStatsProps.contains(f.name) =>
val numFields = ColumnStatStruct.numStatFields(f.dataType)
(f.name, ColumnStat(numFields, colStatsProps(f.name)))
}.toMap
- catalogTable.copy(
- properties = removeStatsProperties(catalogTable),
+ tableWithSchema.copy(
stats = Some(Statistics(
- sizeInBytes = BigInt(catalogTable.properties(STATISTICS_TOTAL_SIZE)),
- rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+ sizeInBytes = BigInt(tableWithSchema.properties(STATISTICS_TOTAL_SIZE)),
+ rowCount = tableWithSchema.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
colStats = colStats)))
} else {
- catalogTable
+ tableWithSchema
}
+
+ tableWithStats.copy(properties = getOriginalTableProperties(table))
}
override def tableExists(db: String, table: String): Boolean = withClient {
@@ -581,13 +595,30 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Partitions
// --------------------------------------------------------------------------
+ // Hive metastore is not case preserving and the partition columns are always lower cased. We need
+ // to lower case the column names in partition specification before calling partition related Hive
+ // APIs, to match this behaviour.
+ private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
+ spec.map { case (k, v) => k.toLowerCase -> v }
+ }
+
+ // Hive metastore is not case preserving and the column names of the partition specification we
+ // get from the metastore are always lower cased. We should restore them w.r.t. the actual table
+ // partition columns.
+ private def restorePartitionSpec(
+ spec: TablePartitionSpec,
+ partCols: Seq[String]): TablePartitionSpec = {
+ spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
+ }
+
override def createPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.createPartitions(db, table, parts, ignoreIfExists)
+ val lowerCasedParts = parts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+ client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
}
override def dropPartitions(
@@ -597,7 +628,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.dropPartitions(db, table, parts, ignoreIfNotExists, purge)
+ client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
}
override def renamePartitions(
@@ -605,21 +636,24 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
- client.renamePartitions(db, table, specs, newSpecs)
+ client.renamePartitions(
+ db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
}
override def alterPartitions(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withClient {
- client.alterPartitions(db, table, newParts)
+ val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+ client.alterPartitions(db, table, lowerCasedParts)
}
override def getPartition(
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = withClient {
- client.getPartition(db, table, spec)
+ val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+ part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
}
/**
@@ -629,7 +663,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
- client.getPartitionOption(db, table, spec)
+ client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+ }
}
/**
@@ -639,14 +675,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
- client.getPartitions(db, table, partialSpec)
+ client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+ }
}
override def listPartitionsByFilter(
db: String,
table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient {
- val catalogTable = client.getTable(db, table)
+ val rawTable = client.getTable(db, table)
+ val catalogTable = restoreTableMetadata(rawTable)
val partitionColumnNames = catalogTable.partitionColumnNames.toSet
val nonPartitionPruningPredicates = predicates.filterNot {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
@@ -660,19 +699,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val partitionSchema = catalogTable.partitionSchema
if (predicates.nonEmpty) {
- val clientPrunedPartitions =
- client.getPartitionsByFilter(catalogTable, predicates)
+ val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+ }
val boundPredicate =
InterpretedPredicate.create(predicates.reduce(And).transform {
case att: AttributeReference =>
val index = partitionSchema.indexWhere(_.name == att.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
- clientPrunedPartitions.filter { case p: CatalogTablePartition =>
- boundPredicate(p.toRow(partitionSchema))
- }
+ clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
} else {
- client.getPartitions(catalogTable)
+ client.getPartitions(catalogTable).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+ }
}
}
@@ -722,7 +762,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
object HiveExternalCatalog {
- val DATASOURCE_PREFIX = "spark.sql.sources."
+ val SPARK_SQL_PREFIX = "spark.sql."
+
+ val DATASOURCE_PREFIX = SPARK_SQL_PREFIX + "sources."
val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
@@ -736,21 +778,20 @@ object HiveExternalCatalog {
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
- val STATISTICS_PREFIX = "spark.sql.statistics."
+ val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics."
val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
- def removeStatsProperties(metadata: CatalogTable): Map[String, String] = {
- metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) }
- }
+ val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider"
+
def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {
metadata.properties.get(DATASOURCE_PROVIDER)
}
def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = {
- metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) }
+ metadata.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }
}
// A persisted data source table always store its schema in the catalog.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6c1585d5f5..d1de863ce3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -76,11 +76,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
- options = table.storage.properties)
+ options = table.storage.properties,
+ catalogTable = Some(table))
- LogicalRelation(
- dataSource.resolveRelation(),
- catalogTable = Some(table))
+ LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
}
}
@@ -194,7 +193,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
- val lazyPruningEnabled = sparkSession.sqlContext.conf.filesourcePartitionPruning
+ val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 8835b266b2..84873bbbb8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -777,7 +777,7 @@ private[hive] class HiveClientImpl(
val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
table.partitionColumnNames.contains(c.getName)
}
- if (table.schema.isEmpty) {
+ if (schema.isEmpty) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
@@ -831,9 +831,6 @@ private[hive] class HiveClientImpl(
new HivePartition(ht, tpart)
}
- // TODO (cloud-fan): the column names in partition specification are always lower cased because
- // Hive metastore is not case preserving. We should normalize them to the actual column names of
- // the table, once we store partition spec of data source tables.
private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
val apiPartition = hp.getTPartition
CatalogTablePartition(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index d290fe9962..6e887d95c0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -63,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
def testCaching(pruningEnabled: Boolean): Unit = {
test(s"partitioned table is cached when partition pruning is $pruningEnabled") {
- withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> pruningEnabled.toString) {
withTable("test") {
withTempDir { dir =>
spark.range(5).selectExpr("id", "id as f1", "id as f2").write
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
new file mode 100644
index 0000000000..5f16960fb1
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PartitionProviderCompatibilitySuite
+ extends QueryTest with TestHiveSingleton with SQLTestUtils {
+
+ private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
+ spark.range(5).selectExpr("id as fieldOne", "id as partCol").write
+ .partitionBy("partCol")
+ .mode("overwrite")
+ .parquet(dir.getAbsolutePath)
+
+ spark.sql(s"""
+ |create table $tableName (fieldOne long, partCol int)
+ |using parquet
+ |options (path "${dir.getAbsolutePath}")
+ |partitioned by (partCol)""".stripMargin)
+ }
+
+ private def verifyIsLegacyTable(tableName: String): Unit = {
+ val unsupportedCommands = Seq(
+ s"ALTER TABLE $tableName ADD PARTITION (partCol=1) LOCATION '/foo'",
+ s"ALTER TABLE $tableName PARTITION (partCol=1) RENAME TO PARTITION (partCol=2)",
+ s"ALTER TABLE $tableName PARTITION (partCol=1) SET LOCATION '/foo'",
+ s"ALTER TABLE $tableName DROP PARTITION (partCol=1)",
+ s"DESCRIBE $tableName PARTITION (partCol=1)",
+ s"SHOW PARTITIONS $tableName")
+
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+ for (cmd <- unsupportedCommands) {
+ val e = intercept[AnalysisException] {
+ spark.sql(cmd)
+ }
+ assert(e.getMessage.contains("partition metadata is not stored in the Hive metastore"), e)
+ }
+ }
+ }
+
+ test("convert partition provider to hive with repair table") {
+ withTable("test") {
+ withTempDir { dir =>
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+ setupPartitionedDatasourceTable("test", dir)
+ assert(spark.sql("select * from test").count() == 5)
+ }
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+ verifyIsLegacyTable("test")
+ spark.sql("msck repair table test")
+ spark.sql("show partitions test").count() // check we are a new table
+
+ // sanity check table performance
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol < 2").count() == 2)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+ }
+ }
+ }
+ }
+
+ test("when partition management is enabled, new tables have partition provider hive") {
+ withTable("test") {
+ withTempDir { dir =>
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+ setupPartitionedDatasourceTable("test", dir)
+ spark.sql("show partitions test").count() // check we are a new table
+ assert(spark.sql("select * from test").count() == 0) // needs repair
+ spark.sql("msck repair table test")
+ assert(spark.sql("select * from test").count() == 5)
+ }
+ }
+ }
+ }
+
+ test("when partition management is disabled, new tables have no partition provider") {
+ withTable("test") {
+ withTempDir { dir =>
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+ setupPartitionedDatasourceTable("test", dir)
+ verifyIsLegacyTable("test")
+ assert(spark.sql("select * from test").count() == 5)
+ }
+ }
+ }
+ }
+
+ test("when partition management is disabled, we preserve the old behavior even for new tables") {
+ withTable("test") {
+ withTempDir { dir =>
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+ setupPartitionedDatasourceTable("test", dir)
+ spark.sql("show partitions test").count() // check we are a new table
+ spark.sql("refresh table test")
+ assert(spark.sql("select * from test").count() == 0)
+ }
+ // disabled
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+ val e = intercept[AnalysisException] {
+ spark.sql(s"show partitions test")
+ }
+ assert(e.getMessage.contains("filesource partition management is disabled"))
+ spark.sql("refresh table test")
+ assert(spark.sql("select * from test").count() == 5)
+ }
+ // then enabled again
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+ spark.sql("refresh table test")
+ assert(spark.sql("select * from test").count() == 0)
+ }
+ }
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 82ee813c6a..476383a5b3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
-class HiveTablePerfStatsSuite
+class PartitionedTablePerfStatsSuite
extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
override def beforeEach(): Unit = {
@@ -41,25 +41,54 @@ class HiveTablePerfStatsSuite
FileStatusCache.resetForTesting()
}
- private def setupPartitionedTable(tableName: String, dir: File): Unit = {
- spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
+ private case class TestSpec(setupTable: (String, File) => Unit, isDatasourceTable: Boolean)
+
+ /**
+ * Runs a test against both converted hive and native datasource tables. The test can use the
+ * passed TestSpec object for setup and inspecting test parameters.
+ */
+ private def genericTest(testName: String)(fn: TestSpec => Unit): Unit = {
+ test("hive table: " + testName) {
+ fn(TestSpec(setupPartitionedHiveTable, false))
+ }
+ test("datasource table: " + testName) {
+ fn(TestSpec(setupPartitionedDatasourceTable, true))
+ }
+ }
+
+ private def setupPartitionedHiveTable(tableName: String, dir: File): Unit = {
+ spark.range(5).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)
spark.sql(s"""
- |create external table $tableName (id long)
+ |create external table $tableName (fieldOne long)
|partitioned by (partCol1 int, partCol2 int)
|stored as parquet
|location "${dir.getAbsolutePath}"""".stripMargin)
spark.sql(s"msck repair table $tableName")
}
- test("partitioned pruned table reports only selected files") {
+ private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
+ spark.range(5).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
+ .partitionBy("partCol1", "partCol2")
+ .mode("overwrite")
+ .parquet(dir.getAbsolutePath)
+
+ spark.sql(s"""
+ |create table $tableName (fieldOne long, partCol1 int, partCol2 int)
+ |using parquet
+ |options (path "${dir.getAbsolutePath}")
+ |partitioned by (partCol1, partCol2)""".stripMargin)
+ spark.sql(s"msck repair table $tableName")
+ }
+
+ genericTest("partitioned pruned table reports only selected files") { spec =>
assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
withTable("test") {
withTempDir { dir =>
- setupPartitionedTable("test", dir)
+ spec.setupTable("test", dir)
val df = spark.sql("select * from test")
assert(df.count() == 5)
assert(df.inputFiles.length == 5) // unpruned
@@ -75,17 +104,24 @@ class HiveTablePerfStatsSuite
val df4 = spark.sql("select * from test where partCol1 = 999")
assert(df4.count() == 0)
assert(df4.inputFiles.length == 0)
+
+ // TODO(ekl) enable for hive tables as well once SPARK-17983 is fixed
+ if (spec.isDatasourceTable) {
+ val df5 = spark.sql("select * from test where fieldOne = 4")
+ assert(df5.count() == 1)
+ assert(df5.inputFiles.length == 5)
+ }
}
}
}
- test("lazy partition pruning reads only necessary partition data") {
+ genericTest("lazy partition pruning reads only necessary partition data") { spec =>
withSQLConf(
- SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true",
+ SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true",
SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") {
withTable("test") {
withTempDir { dir =>
- setupPartitionedTable("test", dir)
+ spec.setupTable("test", dir)
HiveCatalogMetrics.reset()
spark.sql("select * from test where partCol1 = 999").count()
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
@@ -120,13 +156,13 @@ class HiveTablePerfStatsSuite
}
}
- test("lazy partition pruning with file status caching enabled") {
+ genericTest("lazy partition pruning with file status caching enabled") { spec =>
withSQLConf(
- "spark.sql.hive.filesourcePartitionPruning" -> "true",
- "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+ SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true",
+ SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "9999999") {
withTable("test") {
withTempDir { dir =>
- setupPartitionedTable("test", dir)
+ spec.setupTable("test", dir)
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
@@ -161,13 +197,13 @@ class HiveTablePerfStatsSuite
}
}
- test("file status caching respects refresh table and refreshByPath") {
+ genericTest("file status caching respects refresh table and refreshByPath") { spec =>
withSQLConf(
- "spark.sql.hive.filesourcePartitionPruning" -> "true",
- "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+ SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true",
+ SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "9999999") {
withTable("test") {
withTempDir { dir =>
- setupPartitionedTable("test", dir)
+ spec.setupTable("test", dir)
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test").count() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
@@ -190,13 +226,13 @@ class HiveTablePerfStatsSuite
}
}
- test("file status cache respects size limit") {
+ genericTest("file status cache respects size limit") { spec =>
withSQLConf(
- "spark.sql.hive.filesourcePartitionPruning" -> "true",
- "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) {
+ SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true",
+ SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "1" /* 1 byte */) {
withTable("test") {
withTempDir { dir =>
- setupPartitionedTable("test", dir)
+ spec.setupTable("test", dir)
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test").count() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
@@ -209,11 +245,11 @@ class HiveTablePerfStatsSuite
}
}
- test("all partitions read and cached when filesource partition pruning is off") {
- withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
+ test("hive table: files read and cached when filesource partition management is off") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
withTable("test") {
withTempDir { dir =>
- setupPartitionedTable("test", dir)
+ setupPartitionedHiveTable("test", dir)
// We actually query the partitions from hive each time the table is resolved in this
// mode. This is kind of terrible, but is needed to preserve the legacy behavior
@@ -237,4 +273,32 @@ class HiveTablePerfStatsSuite
}
}
}
+
+ test("datasource table: all partition data cached in memory when partition management is off") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedDatasourceTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
+
+ // not using metastore
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+
+ // reads and caches all the files initially
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ }
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index c351063a63..4f5ebc3d83 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -310,39 +310,50 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}
}
- test("test table-level statistics for data source table created in HiveExternalCatalog") {
- val parquetTable = "parquetTable"
- withTable(parquetTable) {
- sql(s"CREATE TABLE $parquetTable (key STRING, value STRING) USING PARQUET")
- val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(parquetTable))
- assert(DDLUtils.isDatasourceTable(catalogTable))
+ private def testUpdatingTableStats(tableDescription: String, createTableCmd: String): Unit = {
+ test("test table-level statistics for " + tableDescription) {
+ val parquetTable = "parquetTable"
+ withTable(parquetTable) {
+ sql(createTableCmd)
+ val catalogTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(parquetTable))
+ assert(DDLUtils.isDatasourceTable(catalogTable))
+
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ checkTableStats(
+ parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
- sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
- checkTableStats(
- parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
+ // noscan won't count the number of rows
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
+ val fetchedStats1 = checkTableStats(
+ parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
- // noscan won't count the number of rows
- sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
- val fetchedStats1 = checkTableStats(
- parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
+ val fetchedStats2 = checkTableStats(
+ parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
+ assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes)
- sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
- sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
- val fetchedStats2 = checkTableStats(
- parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
- assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes)
-
- // without noscan, we count the number of rows
- sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
- val fetchedStats3 = checkTableStats(
- parquetTable,
- isDataSourceTable = true,
- hasSizeInBytes = true,
- expectedRowCounts = Some(1000))
- assert(fetchedStats3.get.sizeInBytes == fetchedStats2.get.sizeInBytes)
+ // without noscan, we count the number of rows
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
+ val fetchedStats3 = checkTableStats(
+ parquetTable,
+ isDataSourceTable = true,
+ hasSizeInBytes = true,
+ expectedRowCounts = Some(1000))
+ assert(fetchedStats3.get.sizeInBytes == fetchedStats2.get.sizeInBytes)
+ }
}
}
+ testUpdatingTableStats(
+ "data source table created in HiveExternalCatalog",
+ "CREATE TABLE parquetTable (key STRING, value STRING) USING PARQUET")
+
+ testUpdatingTableStats(
+ "partitioned data source table",
+ "CREATE TABLE parquetTable (key STRING, value STRING) USING PARQUET PARTITIONED BY (key)")
+
test("statistics collection of a table with zero column") {
val table_no_cols = "table_no_cols"
withTable(table_no_cols) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index ad1e9b17a9..46ed18c70f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -415,10 +415,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
.mode(SaveMode.Overwrite)
.saveAsTable("part_datasrc")
- val message1 = intercept[AnalysisException] {
- sql("SHOW PARTITIONS part_datasrc")
- }.getMessage
- assert(message1.contains("is not allowed on a datasource table"))
+ assert(sql("SHOW PARTITIONS part_datasrc").count() == 3)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 495b4f874a..01fa827220 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -358,7 +358,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
"# Partition Information",
"# col_name",
"Detailed Partition Information CatalogPartition(",
- "Partition Values: [Us, 1]",
+ "Partition Values: [c=Us, d=1]",
"Storage(Location:",
"Partition Parameters")
@@ -399,10 +399,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
.range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write
.partitionBy("d")
.saveAsTable("datasource_table")
- val m4 = intercept[AnalysisException] {
- sql("DESC datasource_table PARTITION (d=2)")
- }.getMessage()
- assert(m4.contains("DESC PARTITION is not allowed on a datasource table"))
+
+ sql("DESC datasource_table PARTITION (d=0)")
val m5 = intercept[AnalysisException] {
spark.range(10).select('id as 'a, 'id as 'b).createTempView("view1")