aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangzhenhua <wangzhenhua@huawei.com>2016-09-05 17:32:31 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-09-05 17:32:31 +0200
commit6d86403d8b252776effcddd71338b4d21a224f9b (patch)
tree2fa41086f7d6c9dacb86d3e34d9c8f6f0b4fdcab
parent3ccb23e445711ea5d9059eb6de7c490c8fc9d112 (diff)
downloadspark-6d86403d8b252776effcddd71338b4d21a224f9b.tar.gz
spark-6d86403d8b252776effcddd71338b4d21a224f9b.tar.bz2
spark-6d86403d8b252776effcddd71338b4d21a224f9b.zip
[SPARK-17072][SQL] support table-level statistics generation and storing into/loading from metastore
## What changes were proposed in this pull request? 1. Support generation table-level statistics for - hive tables in HiveExternalCatalog - data source tables in HiveExternalCatalog - data source tables in InMemoryCatalog. 2. Add a property "catalogStats" in CatalogTable to hold statistics in Spark side. 3. Put logics of statistics transformation between Spark and Hive in HiveClientImpl. 4. Extend Statistics class by adding rowCount (will add estimatedSize when we have column stats). ## How was this patch tested? add unit tests Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wangzhenhua@huawei.com> Closes #14712 from wzhfy/tableStats.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala26
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala57
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala68
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala153
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala27
16 files changed, 363 insertions, 108 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 8408d765d4..79231ee9e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -22,7 +22,7 @@ import java.util.Date
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
@@ -130,6 +130,7 @@ case class CatalogTable(
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
+ stats: Option[Statistics] = None,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
@@ -190,6 +191,7 @@ case class CatalogTable(
viewText.map("View: " + _).getOrElse(""),
comment.map("Comment: " + _).getOrElse(""),
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
+ if (stats.isDefined) s"Statistics: ${stats.get}" else "",
s"$storage")
output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 6e6cc6962c..58fa537a18 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -31,6 +31,19 @@ package org.apache.spark.sql.catalyst.plans.logical
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
+ * @param rowCount Estimated number of rows.
* @param isBroadcastable If true, output is small enough to be used in a broadcast join.
*/
-case class Statistics(sizeInBytes: BigInt, isBroadcastable: Boolean = false)
+case class Statistics(
+ sizeInBytes: BigInt,
+ rowCount: Option[BigInt] = None,
+ isBroadcastable: Boolean = false) {
+ override def toString: String = {
+ val output =
+ Seq(s"sizeInBytes=$sizeInBytes",
+ if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+ s"isBroadcastable=$isBroadcastable"
+ )
+ output.filter(_.nonEmpty).mkString("Statistics(", ", ", ")")
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
index dde91b0a86..6f821f80cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
@@ -590,8 +590,12 @@ class SQLBuilder private (
object ExtractSQLTable {
def unapply(plan: LogicalPlan): Option[SQLTable] = plan match {
- case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
- Some(SQLTable(database, table, l.output.map(_.withQualifier(None))))
+ case l @ LogicalRelation(_, _, Some(catalogTable))
+ if catalogTable.identifier.database.isDefined =>
+ Some(SQLTable(
+ catalogTable.identifier.database.get,
+ catalogTable.identifier.table,
+ l.output.map(_.withQualifier(None))))
case relation: CatalogRelation =>
val m = relation.catalogTable
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fc078da07d..7ba1a9ff22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -99,9 +99,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.identifier.getText.toLowerCase == "noscan") {
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
} else {
- // Always just run the no scan analyze. We should fix this and implement full analyze
- // command in the future.
- AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
+ AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString, noscan = false)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index a469d4da86..15687ddd72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -21,19 +21,18 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.execution.datasources.LogicalRelation
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
- *
- * Right now, it only supports Hive tables and it only updates the size of a Hive table
- * in the Hive metastore.
*/
-case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
+case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
@@ -71,8 +70,6 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
size
}
- val tableParameters = catalogTable.properties
- val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
val newTotalSize =
catalogTable.storage.locationUri.map { p =>
val path = new Path(p)
@@ -88,24 +85,47 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
}
}.getOrElse(0L)
- // Update the Hive metastore if the total size of the table is different than the size
- // recorded in the Hive metastore.
- // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
- if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- sessionState.catalog.alterTable(
- catalogTable.copy(
- properties = relation.catalogTable.properties +
- (AnalyzeTableCommand.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
- }
+ updateTableStats(catalogTable, newTotalSize)
+
+ // data source tables have been converted into LogicalRelations
+ case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
+ updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
case otherRelation =>
- throw new AnalysisException(s"ANALYZE TABLE is only supported for Hive tables, " +
- s"but '${tableIdent.unquotedString}' is a ${otherRelation.nodeName}.")
+ throw new AnalysisException(s"ANALYZE TABLE is not supported for " +
+ s"${otherRelation.nodeName}.")
}
+
+ def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
+ val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
+ val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+ var newStats: Option[Statistics] = None
+ if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ newStats = Some(Statistics(sizeInBytes = newTotalSize))
+ }
+ // We only set rowCount when noscan is false, because otherwise:
+ // 1. when total size is not changed, we don't need to alter the table;
+ // 2. when total size is changed, `oldRowCount` becomes invalid.
+ // This is to make sure that we only record the right statistics.
+ if (!noscan) {
+ val newRowCount = Dataset.ofRows(sparkSession, relation).count()
+ if (newRowCount >= 0 && newRowCount != oldRowCount) {
+ newStats = if (newStats.isDefined) {
+ newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
+ } else {
+ Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
+ }
+ }
+ }
+ // Update the metastore if the above statistics of the table are different from those
+ // recorded in the metastore.
+ if (newStats.isDefined) {
+ sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
+ // Refresh the cached data source table in the catalog.
+ sessionState.catalog.refreshTable(tableIdent)
+ }
+ }
+
Seq.empty[Row]
}
}
-
-object AnalyzeTableCommand {
- val TOTAL_SIZE_FIELD = "totalSize"
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8286467e96..c8ad5b3034 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -209,7 +209,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
LogicalRelation(
dataSource.resolveRelation(),
- metastoreTableIdentifier = Some(table.identifier))
+ catalogTable = Some(table))
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -366,7 +366,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
+ relation.relation, UnknownPartitioning(0), metadata,
+ relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
@@ -376,7 +377,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
+ relation.relation, UnknownPartitioning(0), metadata,
+ relation.catalogTable.map(_.identifier))
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 8b36caf6f1..55ca4f1106 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -112,7 +112,7 @@ object FileSourceStrategy extends Strategy with Logging {
outputSchema,
partitionKeyFilters.toSeq,
pushedDownFilters,
- table)
+ table.map(_.identifier))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 2a8e147011..d9562fd32e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
@@ -33,7 +33,7 @@ import org.apache.spark.util.Utils
case class LogicalRelation(
relation: BaseRelation,
expectedOutputAttributes: Option[Seq[Attribute]] = None,
- metastoreTableIdentifier: Option[TableIdentifier] = None)
+ catalogTable: Option[CatalogTable] = None)
extends LeafNode with MultiInstanceRelation {
override val output: Seq[AttributeReference] = {
@@ -72,9 +72,10 @@ case class LogicalRelation(
// expId can be different but the relation is still the same.
override lazy val cleanArgs: Seq[Any] = Seq(relation)
- @transient override lazy val statistics: Statistics = Statistics(
- sizeInBytes = BigInt(relation.sizeInBytes)
- )
+ @transient override lazy val statistics: Statistics = {
+ catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse(
+ Statistics(sizeInBytes = relation.sizeInBytes))
+ }
/** Used to lookup original attribute capitalization */
val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o)))
@@ -89,7 +90,7 @@ case class LogicalRelation(
LogicalRelation(
relation,
expectedOutputAttributes.map(_.map(_.newInstance())),
- metastoreTableIdentifier).asInstanceOf[this.type]
+ catalogTable).asInstanceOf[this.type]
}
override def refresh(): Unit = relation match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index ae77e4cb96..5b96206ba8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -252,11 +252,11 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
case relation: CatalogRelation =>
val metadata = relation.catalogTable
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
- case LogicalRelation(h: HadoopFsRelation, _, identifier) =>
- val tblName = identifier.map(_.quotedString).getOrElse("unknown")
+ case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
+ val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, h.partitionSchema.map(_.name))
- case LogicalRelation(_: InsertableRelation, _, identifier) =>
- val tblName = identifier.map(_.quotedString).getOrElse("unknown")
+ case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
+ val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
case other => i
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index ab27381c06..8fdbd0f2c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -192,7 +192,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
* Right now, it only supports catalog tables and it only updates the size of a catalog table
* in the external catalog.
*/
- def analyze(tableName: String): Unit = {
- AnalyzeTableCommand(tableName).run(sparkSession)
+ def analyze(tableName: String, noscan: Boolean = true): Unit = {
+ AnalyzeTableCommand(tableName, noscan).run(sparkSession)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
index 2c81cbf15f..264a2ffbeb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -75,4 +76,29 @@ class StatisticsSuite extends QueryTest with SharedSQLContext {
}
}
+ test("test table-level statistics for data source table created in InMemoryCatalog") {
+ def checkTableStats(tableName: String, expectedRowCount: Option[BigInt]): Unit = {
+ val df = sql(s"SELECT * FROM $tableName")
+ val relations = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
+ assert(rel.catalogTable.isDefined)
+ assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
+ rel
+ }
+ assert(relations.size === 1)
+ }
+
+ val tableName = "tbl"
+ withTable(tableName) {
+ sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet")
+ Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
+
+ // noscan won't count the number of rows
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
+ checkTableStats(tableName, expectedRowCount = None)
+
+ // without noscan, we count the number of rows
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
+ checkTableStats(tableName, expectedRowCount = Some(2))
+ }
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1fe7f4d41d..2e127ef562 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
@@ -102,11 +103,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* metastore.
*/
private def verifyTableProperties(table: CatalogTable): Unit = {
- val datasourceKeys = table.properties.keys.filter(_.startsWith(DATASOURCE_PREFIX))
- if (datasourceKeys.nonEmpty) {
+ val invalidKeys = table.properties.keys.filter { key =>
+ key.startsWith(DATASOURCE_PREFIX) || key.startsWith(STATISTICS_PREFIX)
+ }
+ if (invalidKeys.nonEmpty) {
throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
- s"as table property keys may not start with '$DATASOURCE_PREFIX': " +
- datasourceKeys.mkString("[", ", ", "]"))
+ s"as table property keys may not start with '$DATASOURCE_PREFIX' or '$STATISTICS_PREFIX':" +
+ s" ${invalidKeys.mkString("[", ", ", "]")}")
}
}
@@ -388,21 +391,34 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
requireTableExists(db, tableDefinition.identifier.table)
verifyTableProperties(tableDefinition)
- if (DDLUtils.isDatasourceTable(tableDefinition)) {
- val oldDef = client.getTable(db, tableDefinition.identifier.table)
+ // convert table statistics to properties so that we can persist them through hive api
+ val withStatsProps = if (tableDefinition.stats.isDefined) {
+ val stats = tableDefinition.stats.get
+ var statsProperties: Map[String, String] =
+ Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+ if (stats.rowCount.isDefined) {
+ statsProperties += (STATISTICS_NUM_ROWS -> stats.rowCount.get.toString())
+ }
+ tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties)
+ } else {
+ tableDefinition
+ }
+
+ if (DDLUtils.isDatasourceTable(withStatsProps)) {
+ val oldDef = client.getTable(db, withStatsProps.identifier.table)
// Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
// to retain the spark specific format if it is. Also add old data source properties to table
// properties, to retain the data source table format.
val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
- val newDef = tableDefinition.copy(
+ val newDef = withStatsProps.copy(
schema = oldDef.schema,
partitionColumnNames = oldDef.partitionColumnNames,
bucketSpec = oldDef.bucketSpec,
- properties = oldDataSourceProps ++ tableDefinition.properties)
+ properties = oldDataSourceProps ++ withStatsProps.properties)
client.alterTable(newDef)
} else {
- client.alterTable(tableDefinition)
+ client.alterTable(withStatsProps)
}
}
@@ -422,7 +438,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* properties, and filter out these special entries from table properties.
*/
private def restoreTableMetadata(table: CatalogTable): CatalogTable = {
- if (table.tableType == VIEW) {
+ val catalogTable = if (table.tableType == VIEW) {
table
} else {
getProviderFromTableProperties(table).map { provider =>
@@ -452,6 +468,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table.copy(provider = Some("hive"))
}
}
+ // construct Spark's statistics from information in Hive metastore
+ if (catalogTable.properties.contains(STATISTICS_TOTAL_SIZE)) {
+ val totalSize = BigInt(catalogTable.properties.get(STATISTICS_TOTAL_SIZE).get)
+ // TODO: we will compute "estimatedSize" when we have column stats:
+ // average size of row * number of rows
+ catalogTable.copy(
+ properties = removeStatsProperties(catalogTable),
+ stats = Some(Statistics(
+ sizeInBytes = totalSize,
+ rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)))))
+ } else {
+ catalogTable
+ }
}
override def tableExists(db: String, table: String): Boolean = withClient {
@@ -657,6 +686,14 @@ object HiveExternalCatalog {
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
+ val STATISTICS_PREFIX = "spark.sql.statistics."
+ val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
+ val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
+
+ def removeStatsProperties(metadata: CatalogTable): Map[String, String] = {
+ metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) }
+ }
+
def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {
metadata.properties.get(DATASOURCE_PROVIDER)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ff82c7f7af..d31a8d643a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -82,7 +82,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
LogicalRelation(
dataSource.resolveRelation(checkPathExist = true),
- metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database))))
+ catalogTable = Some(table))
}
}
@@ -257,10 +257,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
fileFormat = defaultSource,
options = options)
- val created = LogicalRelation(
- relation,
- metastoreTableIdentifier =
- Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))
+ val created = LogicalRelation(relation, catalogTable = Some(metastoreRelation.catalogTable))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
@@ -286,8 +283,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
bucketSpec = bucketSpec,
options = options,
className = fileType).resolveRelation(),
- metastoreTableIdentifier =
- Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))
+ catalogTable = Some(metastoreRelation.catalogTable))
cachedDataSourceTables.put(tableIdentifier, created)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index a90da98811..0bfdc137fa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -108,39 +108,41 @@ private[hive] case class MetastoreRelation(
new HiveTable(tTable)
}
- @transient override lazy val statistics: Statistics = Statistics(
- sizeInBytes = {
- val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
- val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
- // TODO: check if this estimate is valid for tables after partition pruning.
- // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
- // relatively cheap if parameters for the table are populated into the metastore.
- // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
- // (see StatsSetupConst in Hive) that we can look at in the future.
- BigInt(
- // When table is external,`totalSize` is always zero, which will influence join strategy
- // so when `totalSize` is zero, use `rawDataSize` instead
- // if the size is still less than zero, we try to get the file size from HDFS.
- // given this is only needed for optimization, if the HDFS call fails we return the default.
- if (totalSize != null && totalSize.toLong > 0L) {
- totalSize.toLong
- } else if (rawDataSize != null && rawDataSize.toLong > 0) {
- rawDataSize.toLong
- } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
- val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
- fs.getContentSummary(hiveQlTable.getPath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- sparkSession.sessionState.conf.defaultSizeInBytes
- }
- } else {
- sparkSession.sessionState.conf.defaultSizeInBytes
- })
- }
- )
+ @transient override lazy val statistics: Statistics = {
+ catalogTable.stats.getOrElse(Statistics(
+ sizeInBytes = {
+ val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
+ val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
+ // TODO: check if this estimate is valid for tables after partition pruning.
+ // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
+ // relatively cheap if parameters for the table are populated into the metastore.
+ // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
+ // (see StatsSetupConst in Hive) that we can look at in the future.
+ BigInt(
+ // When table is external,`totalSize` is always zero, which will influence join strategy
+ // so when `totalSize` is zero, use `rawDataSize` instead
+ // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`,
+ // which is generated by analyze command.
+ if (totalSize != null && totalSize.toLong > 0L) {
+ totalSize.toLong
+ } else if (rawDataSize != null && rawDataSize.toLong > 0) {
+ rawDataSize.toLong
+ } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
+ fs.getContentSummary(hiveQlTable.getPath).getLength
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ }
+ } else {
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ })
+ }
+ ))
+ }
// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index b275ab17a9..33ed675754 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -23,11 +23,14 @@ import scala.reflect.ClassTag
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.AnalyzeTableCommand
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.execution.command.{AnalyzeTableCommand, DDLUtils}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
@@ -168,6 +171,154 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
}
+ private def checkMetastoreRelationStats(
+ tableName: String,
+ expectedStats: Option[Statistics]): Unit = {
+ val df = sql(s"SELECT * FROM $tableName")
+ val relations = df.queryExecution.analyzed.collect { case rel: MetastoreRelation =>
+ expectedStats match {
+ case Some(es) =>
+ assert(rel.catalogTable.stats.isDefined)
+ val stats = rel.catalogTable.stats.get
+ assert(stats.sizeInBytes === es.sizeInBytes)
+ assert(stats.rowCount === es.rowCount)
+ case None =>
+ assert(rel.catalogTable.stats.isEmpty)
+ }
+ rel
+ }
+ assert(relations.size === 1)
+ }
+
+ test("test table-level statistics for hive tables created in HiveExternalCatalog") {
+ val textTable = "textTable"
+ withTable(textTable) {
+ // Currently Spark's statistics are self-contained, we don't have statistics until we use
+ // the `ANALYZE TABLE` command.
+ sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
+ checkMetastoreRelationStats(textTable, expectedStats = None)
+ sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
+ checkMetastoreRelationStats(textTable, expectedStats = None)
+
+ // noscan won't count the number of rows
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 5812, rowCount = None)))
+
+ // without noscan, we count the number of rows
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 5812, rowCount = Some(500))))
+ }
+ }
+
+ test("test elimination of the influences of the old stats") {
+ val textTable = "textTable"
+ withTable(textTable) {
+ sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
+ sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 5812, rowCount = Some(500))))
+
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
+ // when the total size is not changed, the old row count is kept
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 5812, rowCount = Some(500))))
+
+ sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
+ // update total size and remove the old and invalid row count
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 11624, rowCount = None)))
+ }
+ }
+
+ private def checkLogicalRelationStats(
+ tableName: String,
+ expectedStats: Option[Statistics]): Unit = {
+ val df = sql(s"SELECT * FROM $tableName")
+ val relations = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
+ assert(rel.catalogTable.isDefined)
+ expectedStats match {
+ case Some(es) =>
+ assert(rel.catalogTable.get.stats.isDefined)
+ val stats = rel.catalogTable.get.stats.get
+ assert(stats.sizeInBytes === es.sizeInBytes)
+ assert(stats.rowCount === es.rowCount)
+ case None =>
+ assert(rel.catalogTable.get.stats.isEmpty)
+ }
+ rel
+ }
+ assert(relations.size === 1)
+ }
+
+ test("test statistics of LogicalRelation converted from MetastoreRelation") {
+ val parquetTable = "parquetTable"
+ val orcTable = "orcTable"
+ withTable(parquetTable, orcTable) {
+ sql(s"CREATE TABLE $parquetTable (key STRING, value STRING) STORED AS PARQUET")
+ sql(s"CREATE TABLE $orcTable (key STRING, value STRING) STORED AS ORC")
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ sql(s"INSERT INTO TABLE $orcTable SELECT * FROM src")
+
+ // the default value for `spark.sql.hive.convertMetastoreParquet` is true, here we just set it
+ // for robustness
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+ checkLogicalRelationStats(parquetTable, expectedStats = None)
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
+ checkLogicalRelationStats(parquetTable, expectedStats =
+ Some(Statistics(sizeInBytes = 4236, rowCount = Some(500))))
+ }
+ withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") {
+ checkLogicalRelationStats(orcTable, expectedStats = None)
+ sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS")
+ checkLogicalRelationStats(orcTable, expectedStats =
+ Some(Statistics(sizeInBytes = 3023, rowCount = Some(500))))
+ }
+ }
+ }
+
+ test("test table-level statistics for data source table created in HiveExternalCatalog") {
+ val parquetTable = "parquetTable"
+ withTable(parquetTable) {
+ sql(s"CREATE TABLE $parquetTable (key STRING, value STRING) USING PARQUET")
+ val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(parquetTable))
+ assert(DDLUtils.isDatasourceTable(catalogTable))
+
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ checkLogicalRelationStats(parquetTable, expectedStats = None)
+
+ // noscan won't count the number of rows
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
+ checkLogicalRelationStats(parquetTable, expectedStats =
+ Some(Statistics(sizeInBytes = 4236, rowCount = None)))
+
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
+ checkLogicalRelationStats(parquetTable, expectedStats =
+ Some(Statistics(sizeInBytes = 8472, rowCount = None)))
+
+ // without noscan, we count the number of rows
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
+ checkLogicalRelationStats(parquetTable, expectedStats =
+ Some(Statistics(sizeInBytes = 8472, rowCount = Some(1000))))
+ }
+ }
+
+ test("statistics collection of a table with zero column") {
+ val table_no_cols = "table_no_cols"
+ withTable(table_no_cols) {
+ val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
+ val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
+ dfNoCols.write.format("json").saveAsTable(table_no_cols)
+ sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS")
+ checkLogicalRelationStats(table_no_cols, expectedStats =
+ Some(Statistics(sizeInBytes = 30, rowCount = Some(10))))
+ }
+ }
+
test("estimates the size of a test MetastoreRelation") {
val df = sql("""SELECT * FROM src""")
val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index eff32805bf..3cba5b2a09 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1026,26 +1026,29 @@ class HiveDDLSuite
}
}
- test("datasource table property keys are not allowed") {
+ test("datasource and statistics table property keys are not allowed") {
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX
+ import org.apache.spark.sql.hive.HiveExternalCatalog.STATISTICS_PREFIX
withTable("tbl") {
sql("CREATE TABLE tbl(a INT) STORED AS parquet")
- val e = intercept[AnalysisException] {
- sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')")
- }
- assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
+ Seq(DATASOURCE_PREFIX, STATISTICS_PREFIX).foreach { forbiddenPrefix =>
+ val e = intercept[AnalysisException] {
+ sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${forbiddenPrefix}foo' = 'loser')")
+ }
+ assert(e.getMessage.contains(forbiddenPrefix + "foo"))
- val e2 = intercept[AnalysisException] {
- sql(s"ALTER TABLE tbl UNSET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo')")
- }
- assert(e2.getMessage.contains(DATASOURCE_PREFIX + "foo"))
+ val e2 = intercept[AnalysisException] {
+ sql(s"ALTER TABLE tbl UNSET TBLPROPERTIES ('${forbiddenPrefix}foo')")
+ }
+ assert(e2.getMessage.contains(forbiddenPrefix + "foo"))
- val e3 = intercept[AnalysisException] {
- sql(s"CREATE TABLE tbl TBLPROPERTIES ('${DATASOURCE_PREFIX}foo'='anything')")
+ val e3 = intercept[AnalysisException] {
+ sql(s"CREATE TABLE tbl TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
+ }
+ assert(e3.getMessage.contains(forbiddenPrefix + "foo"))
}
- assert(e3.getMessage.contains(DATASOURCE_PREFIX + "foo"))
}
}
}