aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala26
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala57
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala68
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala153
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala27
16 files changed, 363 insertions, 108 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 8408d765d4..79231ee9e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -22,7 +22,7 @@ import java.util.Date
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
@@ -130,6 +130,7 @@ case class CatalogTable(
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
+ stats: Option[Statistics] = None,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
@@ -190,6 +191,7 @@ case class CatalogTable(
viewText.map("View: " + _).getOrElse(""),
comment.map("Comment: " + _).getOrElse(""),
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
+ if (stats.isDefined) s"Statistics: ${stats.get}" else "",
s"$storage")
output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 6e6cc6962c..58fa537a18 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -31,6 +31,19 @@ package org.apache.spark.sql.catalyst.plans.logical
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
+ * @param rowCount Estimated number of rows.
* @param isBroadcastable If true, output is small enough to be used in a broadcast join.
*/
-case class Statistics(sizeInBytes: BigInt, isBroadcastable: Boolean = false)
+case class Statistics(
+ sizeInBytes: BigInt,
+ rowCount: Option[BigInt] = None,
+ isBroadcastable: Boolean = false) {
+ override def toString: String = {
+ val output =
+ Seq(s"sizeInBytes=$sizeInBytes",
+ if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+ s"isBroadcastable=$isBroadcastable"
+ )
+ output.filter(_.nonEmpty).mkString("Statistics(", ", ", ")")
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
index dde91b0a86..6f821f80cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
@@ -590,8 +590,12 @@ class SQLBuilder private (
object ExtractSQLTable {
def unapply(plan: LogicalPlan): Option[SQLTable] = plan match {
- case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
- Some(SQLTable(database, table, l.output.map(_.withQualifier(None))))
+ case l @ LogicalRelation(_, _, Some(catalogTable))
+ if catalogTable.identifier.database.isDefined =>
+ Some(SQLTable(
+ catalogTable.identifier.database.get,
+ catalogTable.identifier.table,
+ l.output.map(_.withQualifier(None))))
case relation: CatalogRelation =>
val m = relation.catalogTable
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fc078da07d..7ba1a9ff22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -99,9 +99,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.identifier.getText.toLowerCase == "noscan") {
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
} else {
- // Always just run the no scan analyze. We should fix this and implement full analyze
- // command in the future.
- AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
+ AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString, noscan = false)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index a469d4da86..15687ddd72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -21,19 +21,18 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.execution.datasources.LogicalRelation
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
- *
- * Right now, it only supports Hive tables and it only updates the size of a Hive table
- * in the Hive metastore.
*/
-case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
+case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
@@ -71,8 +70,6 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
size
}
- val tableParameters = catalogTable.properties
- val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
val newTotalSize =
catalogTable.storage.locationUri.map { p =>
val path = new Path(p)
@@ -88,24 +85,47 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
}
}.getOrElse(0L)
- // Update the Hive metastore if the total size of the table is different than the size
- // recorded in the Hive metastore.
- // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
- if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- sessionState.catalog.alterTable(
- catalogTable.copy(
- properties = relation.catalogTable.properties +
- (AnalyzeTableCommand.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
- }
+ updateTableStats(catalogTable, newTotalSize)
+
+ // data source tables have been converted into LogicalRelations
+ case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
+ updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
case otherRelation =>
- throw new AnalysisException(s"ANALYZE TABLE is only supported for Hive tables, " +
- s"but '${tableIdent.unquotedString}' is a ${otherRelation.nodeName}.")
+ throw new AnalysisException(s"ANALYZE TABLE is not supported for " +
+ s"${otherRelation.nodeName}.")
}
+
+ def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
+ val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
+ val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+ var newStats: Option[Statistics] = None
+ if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ newStats = Some(Statistics(sizeInBytes = newTotalSize))
+ }
+ // We only set rowCount when noscan is false, because otherwise:
+ // 1. when total size is not changed, we don't need to alter the table;
+ // 2. when total size is changed, `oldRowCount` becomes invalid.
+ // This is to make sure that we only record the right statistics.
+ if (!noscan) {
+ val newRowCount = Dataset.ofRows(sparkSession, relation).count()
+ if (newRowCount >= 0 && newRowCount != oldRowCount) {
+ newStats = if (newStats.isDefined) {
+ newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
+ } else {
+ Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
+ }
+ }
+ }
+ // Update the metastore if the above statistics of the table are different from those
+ // recorded in the metastore.
+ if (newStats.isDefined) {
+ sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
+ // Refresh the cached data source table in the catalog.
+ sessionState.catalog.refreshTable(tableIdent)
+ }
+ }
+
Seq.empty[Row]
}
}
-
-object AnalyzeTableCommand {
- val TOTAL_SIZE_FIELD = "totalSize"
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8286467e96..c8ad5b3034 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -209,7 +209,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
LogicalRelation(
dataSource.resolveRelation(),
- metastoreTableIdentifier = Some(table.identifier))
+ catalogTable = Some(table))
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -366,7 +366,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
+ relation.relation, UnknownPartitioning(0), metadata,
+ relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
@@ -376,7 +377,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
+ relation.relation, UnknownPartitioning(0), metadata,
+ relation.catalogTable.map(_.identifier))
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 8b36caf6f1..55ca4f1106 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -112,7 +112,7 @@ object FileSourceStrategy extends Strategy with Logging {
outputSchema,
partitionKeyFilters.toSeq,
pushedDownFilters,
- table)
+ table.map(_.identifier))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 2a8e147011..d9562fd32e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
@@ -33,7 +33,7 @@ import org.apache.spark.util.Utils
case class LogicalRelation(
relation: BaseRelation,
expectedOutputAttributes: Option[Seq[Attribute]] = None,
- metastoreTableIdentifier: Option[TableIdentifier] = None)
+ catalogTable: Option[CatalogTable] = None)
extends LeafNode with MultiInstanceRelation {
override val output: Seq[AttributeReference] = {
@@ -72,9 +72,10 @@ case class LogicalRelation(
// expId can be different but the relation is still the same.
override lazy val cleanArgs: Seq[Any] = Seq(relation)
- @transient override lazy val statistics: Statistics = Statistics(
- sizeInBytes = BigInt(relation.sizeInBytes)
- )
+ @transient override lazy val statistics: Statistics = {
+ catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse(
+ Statistics(sizeInBytes = relation.sizeInBytes))
+ }
/** Used to lookup original attribute capitalization */
val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o)))
@@ -89,7 +90,7 @@ case class LogicalRelation(
LogicalRelation(
relation,
expectedOutputAttributes.map(_.map(_.newInstance())),
- metastoreTableIdentifier).asInstanceOf[this.type]
+ catalogTable).asInstanceOf[this.type]
}
override def refresh(): Unit = relation match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index ae77e4cb96..5b96206ba8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -252,11 +252,11 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
case relation: CatalogRelation =>
val metadata = relation.catalogTable
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
- case LogicalRelation(h: HadoopFsRelation, _, identifier) =>
- val tblName = identifier.map(_.quotedString).getOrElse("unknown")
+ case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
+ val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, h.partitionSchema.map(_.name))
- case LogicalRelation(_: InsertableRelation, _, identifier) =>
- val tblName = identifier.map(_.quotedString).getOrElse("unknown")
+ case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
+ val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
case other => i
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index ab27381c06..8fdbd0f2c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -192,7 +192,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
* Right now, it only supports catalog tables and it only updates the size of a catalog table
* in the external catalog.
*/
- def analyze(tableName: String): Unit = {
- AnalyzeTableCommand(tableName).run(sparkSession)
+ def analyze(tableName: String, noscan: Boolean = true): Unit = {
+ AnalyzeTableCommand(tableName, noscan).run(sparkSession)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
index 2c81cbf15f..264a2ffbeb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -75,4 +76,29 @@ class StatisticsSuite extends QueryTest with SharedSQLContext {
}
}
+ test("test table-level statistics for data source table created in InMemoryCatalog") {
+ def checkTableStats(tableName: String, expectedRowCount: Option[BigInt]): Unit = {
+ val df = sql(s"SELECT * FROM $tableName")
+ val relations = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
+ assert(rel.catalogTable.isDefined)
+ assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
+ rel
+ }
+ assert(relations.size === 1)
+ }
+
+ val tableName = "tbl"
+ withTable(tableName) {
+ sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet")
+ Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
+
+ // noscan won't count the number of rows
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
+ checkTableStats(tableName, expectedRowCount = None)
+
+ // without noscan, we count the number of rows
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
+ checkTableStats(tableName, expectedRowCount = Some(2))
+ }
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1fe7f4d41d..2e127ef562 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
@@ -102,11 +103,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* metastore.
*/
private def verifyTableProperties(table: CatalogTable): Unit = {
- val datasourceKeys = table.properties.keys.filter(_.startsWith(DATASOURCE_PREFIX))
- if (datasourceKeys.nonEmpty) {
+ val invalidKeys = table.properties.keys.filter { key =>
+ key.startsWith(DATASOURCE_PREFIX) || key.startsWith(STATISTICS_PREFIX)
+ }
+ if (invalidKeys.nonEmpty) {
throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
- s"as table property keys may not start with '$DATASOURCE_PREFIX': " +
- datasourceKeys.mkString("[", ", ", "]"))
+ s"as table property keys may not start with '$DATASOURCE_PREFIX' or '$STATISTICS_PREFIX':" +
+ s" ${invalidKeys.mkString("[", ", ", "]")}")
}
}
@@ -388,21 +391,34 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
requireTableExists(db, tableDefinition.identifier.table)
verifyTableProperties(tableDefinition)
- if (DDLUtils.isDatasourceTable(tableDefinition)) {
- val oldDef = client.getTable(db, tableDefinition.identifier.table)
+ // convert table statistics to properties so that we can persist them through hive api
+ val withStatsProps = if (tableDefinition.stats.isDefined) {
+ val stats = tableDefinition.stats.get
+ var statsProperties: Map[String, String] =
+ Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+ if (stats.rowCount.isDefined) {
+ statsProperties += (STATISTICS_NUM_ROWS -> stats.rowCount.get.toString())
+ }
+ tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties)
+ } else {
+ tableDefinition
+ }
+
+ if (DDLUtils.isDatasourceTable(withStatsProps)) {
+ val oldDef = client.getTable(db, withStatsProps.identifier.table)
// Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
// to retain the spark specific format if it is. Also add old data source properties to table
// properties, to retain the data source table format.
val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
- val newDef = tableDefinition.copy(
+ val newDef = withStatsProps.copy(
schema = oldDef.schema,
partitionColumnNames = oldDef.partitionColumnNames,
bucketSpec = oldDef.bucketSpec,
- properties = oldDataSourceProps ++ tableDefinition.properties)
+ properties = oldDataSourceProps ++ withStatsProps.properties)
client.alterTable(newDef)
} else {
- client.alterTable(tableDefinition)
+ client.alterTable(withStatsProps)
}
}
@@ -422,7 +438,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* properties, and filter out these special entries from table properties.
*/
private def restoreTableMetadata(table: CatalogTable): CatalogTable = {
- if (table.tableType == VIEW) {
+ val catalogTable = if (table.tableType == VIEW) {
table
} else {
getProviderFromTableProperties(table).map { provider =>
@@ -452,6 +468,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table.copy(provider = Some("hive"))
}
}
+ // construct Spark's statistics from information in Hive metastore
+ if (catalogTable.properties.contains(STATISTICS_TOTAL_SIZE)) {
+ val totalSize = BigInt(catalogTable.properties.get(STATISTICS_TOTAL_SIZE).get)
+ // TODO: we will compute "estimatedSize" when we have column stats:
+ // average size of row * number of rows
+ catalogTable.copy(
+ properties = removeStatsProperties(catalogTable),
+ stats = Some(Statistics(
+ sizeInBytes = totalSize,
+ rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)))))
+ } else {
+ catalogTable
+ }
}
override def tableExists(db: String, table: String): Boolean = withClient {
@@ -657,6 +686,14 @@ object HiveExternalCatalog {
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
+ val STATISTICS_PREFIX = "spark.sql.statistics."
+ val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
+ val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
+
+ def removeStatsProperties(metadata: CatalogTable): Map[String, String] = {
+ metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) }
+ }
+
def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {
metadata.properties.get(DATASOURCE_PROVIDER)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ff82c7f7af..d31a8d643a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -82,7 +82,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
LogicalRelation(
dataSource.resolveRelation(checkPathExist = true),
- metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database))))
+ catalogTable = Some(table))
}
}
@@ -257,10 +257,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
fileFormat = defaultSource,
options = options)
- val created = LogicalRelation(
- relation,
- metastoreTableIdentifier =
- Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))
+ val created = LogicalRelation(relation, catalogTable = Some(metastoreRelation.catalogTable))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
@@ -286,8 +283,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
bucketSpec = bucketSpec,
options = options,
className = fileType).resolveRelation(),
- metastoreTableIdentifier =
- Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))
+ catalogTable = Some(metastoreRelation.catalogTable))
cachedDataSourceTables.put(tableIdentifier, created)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index a90da98811..0bfdc137fa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -108,39 +108,41 @@ private[hive] case class MetastoreRelation(
new HiveTable(tTable)
}
- @transient override lazy val statistics: Statistics = Statistics(
- sizeInBytes = {
- val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
- val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
- // TODO: check if this estimate is valid for tables after partition pruning.
- // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
- // relatively cheap if parameters for the table are populated into the metastore.
- // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
- // (see StatsSetupConst in Hive) that we can look at in the future.
- BigInt(
- // When table is external,`totalSize` is always zero, which will influence join strategy
- // so when `totalSize` is zero, use `rawDataSize` instead
- // if the size is still less than zero, we try to get the file size from HDFS.
- // given this is only needed for optimization, if the HDFS call fails we return the default.
- if (totalSize != null && totalSize.toLong > 0L) {
- totalSize.toLong
- } else if (rawDataSize != null && rawDataSize.toLong > 0) {
- rawDataSize.toLong
- } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
- val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
- fs.getContentSummary(hiveQlTable.getPath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- sparkSession.sessionState.conf.defaultSizeInBytes
- }
- } else {
- sparkSession.sessionState.conf.defaultSizeInBytes
- })
- }
- )
+ @transient override lazy val statistics: Statistics = {
+ catalogTable.stats.getOrElse(Statistics(
+ sizeInBytes = {
+ val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
+ val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
+ // TODO: check if this estimate is valid for tables after partition pruning.
+ // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
+ // relatively cheap if parameters for the table are populated into the metastore.
+ // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
+ // (see StatsSetupConst in Hive) that we can look at in the future.
+ BigInt(
+ // When table is external,`totalSize` is always zero, which will influence join strategy
+ // so when `totalSize` is zero, use `rawDataSize` instead
+ // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`,
+ // which is generated by analyze command.
+ if (totalSize != null && totalSize.toLong > 0L) {
+ totalSize.toLong
+ } else if (rawDataSize != null && rawDataSize.toLong > 0) {
+ rawDataSize.toLong
+ } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
+ fs.getContentSummary(hiveQlTable.getPath).getLength
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ }
+ } else {
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ })
+ }
+ ))
+ }
// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index b275ab17a9..33ed675754 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -23,11 +23,14 @@ import scala.reflect.ClassTag
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.AnalyzeTableCommand
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.execution.command.{AnalyzeTableCommand, DDLUtils}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
@@ -168,6 +171,154 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
}
+ private def checkMetastoreRelationStats(
+ tableName: String,
+ expectedStats: Option[Statistics]): Unit = {
+ val df = sql(s"SELECT * FROM $tableName")
+ val relations = df.queryExecution.analyzed.collect { case rel: MetastoreRelation =>
+ expectedStats match {
+ case Some(es) =>
+ assert(rel.catalogTable.stats.isDefined)
+ val stats = rel.catalogTable.stats.get
+ assert(stats.sizeInBytes === es.sizeInBytes)
+ assert(stats.rowCount === es.rowCount)
+ case None =>
+ assert(rel.catalogTable.stats.isEmpty)
+ }
+ rel
+ }
+ assert(relations.size === 1)
+ }
+
+ test("test table-level statistics for hive tables created in HiveExternalCatalog") {
+ val textTable = "textTable"
+ withTable(textTable) {
+ // Currently Spark's statistics are self-contained, we don't have statistics until we use
+ // the `ANALYZE TABLE` command.
+ sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
+ checkMetastoreRelationStats(textTable, expectedStats = None)
+ sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
+ checkMetastoreRelationStats(textTable, expectedStats = None)
+
+ // noscan won't count the number of rows
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 5812, rowCount = None)))
+
+ // without noscan, we count the number of rows
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 5812, rowCount = Some(500))))
+ }
+ }
+
+ test("test elimination of the influences of the old stats") {
+ val textTable = "textTable"
+ withTable(textTable) {
+ sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
+ sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 5812, rowCount = Some(500))))
+
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
+ // when the total size is not changed, the old row count is kept
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 5812, rowCount = Some(500))))
+
+ sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
+ sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
+ // update total size and remove the old and invalid row count
+ checkMetastoreRelationStats(textTable, expectedStats =
+ Some(Statistics(sizeInBytes = 11624, rowCount = None)))
+ }
+ }
+
+ private def checkLogicalRelationStats(
+ tableName: String,
+ expectedStats: Option[Statistics]): Unit = {
+ val df = sql(s"SELECT * FROM $tableName")
+ val relations = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
+ assert(rel.catalogTable.isDefined)
+ expectedStats match {
+ case Some(es) =>
+ assert(rel.catalogTable.get.stats.isDefined)
+ val stats = rel.catalogTable.get.stats.get
+ assert(stats.sizeInBytes === es.sizeInBytes)
+ assert(stats.rowCount === es.rowCount)
+ case None =>
+ assert(rel.catalogTable.get.stats.isEmpty)
+ }
+ rel
+ }
+ assert(relations.size === 1)
+ }
+
+ test("test statistics of LogicalRelation converted from MetastoreRelation") {
+ val parquetTable = "parquetTable"
+ val orcTable = "orcTable"
+ withTable(parquetTable, orcTable) {
+ sql(s"CREATE TABLE $parquetTable (key STRING, value STRING) STORED AS PARQUET")
+ sql(s"CREATE TABLE $orcTable (key STRING, value STRING) STORED AS ORC")
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ sql(s"INSERT INTO TABLE $orcTable SELECT * FROM src")
+
+ // the default value for `spark.sql.hive.convertMetastoreParquet` is true, here we just set it
+ // for robustness
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+ checkLogicalRelationStats(parquetTable, expectedStats = None)
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
+ checkLogicalRelationStats(parquetTable, expectedStats =
+ Some(Statistics(sizeInBytes = 4236, rowCount = Some(500))))
+ }
+ withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") {
+ checkLogicalRelationStats(orcTable, expectedStats = None)
+ sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS")
+ checkLogicalRelationStats(orcTable, expectedStats =
+ Some(Statistics(sizeInBytes = 3023, rowCount = Some(500))))
+ }
+ }
+ }
+
+ test("test table-level statistics for data source table created in HiveExternalCatalog") {
+ val parquetTable = "parquetTable"
+ withTable(parquetTable) {
+ sql(s"CREATE TABLE $parquetTable (key STRING, value STRING) USING PARQUET")
+ val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(parquetTable))
+ assert(DDLUtils.isDatasourceTable(catalogTable))
+
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ checkLogicalRelationStats(parquetTable, expectedStats = None)
+
+ // noscan won't count the number of rows
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
+ checkLogicalRelationStats(parquetTable, expectedStats =
+ Some(Statistics(sizeInBytes = 4236, rowCount = None)))
+
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
+ checkLogicalRelationStats(parquetTable, expectedStats =
+ Some(Statistics(sizeInBytes = 8472, rowCount = None)))
+
+ // without noscan, we count the number of rows
+ sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
+ checkLogicalRelationStats(parquetTable, expectedStats =
+ Some(Statistics(sizeInBytes = 8472, rowCount = Some(1000))))
+ }
+ }
+
+ test("statistics collection of a table with zero column") {
+ val table_no_cols = "table_no_cols"
+ withTable(table_no_cols) {
+ val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
+ val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
+ dfNoCols.write.format("json").saveAsTable(table_no_cols)
+ sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS")
+ checkLogicalRelationStats(table_no_cols, expectedStats =
+ Some(Statistics(sizeInBytes = 30, rowCount = Some(10))))
+ }
+ }
+
test("estimates the size of a test MetastoreRelation") {
val df = sql("""SELECT * FROM src""")
val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index eff32805bf..3cba5b2a09 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1026,26 +1026,29 @@ class HiveDDLSuite
}
}
- test("datasource table property keys are not allowed") {
+ test("datasource and statistics table property keys are not allowed") {
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX
+ import org.apache.spark.sql.hive.HiveExternalCatalog.STATISTICS_PREFIX
withTable("tbl") {
sql("CREATE TABLE tbl(a INT) STORED AS parquet")
- val e = intercept[AnalysisException] {
- sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')")
- }
- assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
+ Seq(DATASOURCE_PREFIX, STATISTICS_PREFIX).foreach { forbiddenPrefix =>
+ val e = intercept[AnalysisException] {
+ sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${forbiddenPrefix}foo' = 'loser')")
+ }
+ assert(e.getMessage.contains(forbiddenPrefix + "foo"))
- val e2 = intercept[AnalysisException] {
- sql(s"ALTER TABLE tbl UNSET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo')")
- }
- assert(e2.getMessage.contains(DATASOURCE_PREFIX + "foo"))
+ val e2 = intercept[AnalysisException] {
+ sql(s"ALTER TABLE tbl UNSET TBLPROPERTIES ('${forbiddenPrefix}foo')")
+ }
+ assert(e2.getMessage.contains(forbiddenPrefix + "foo"))
- val e3 = intercept[AnalysisException] {
- sql(s"CREATE TABLE tbl TBLPROPERTIES ('${DATASOURCE_PREFIX}foo'='anything')")
+ val e3 = intercept[AnalysisException] {
+ sql(s"CREATE TABLE tbl TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
+ }
+ assert(e3.getMessage.contains(forbiddenPrefix + "foo"))
}
- assert(e3.getMessage.contains(DATASOURCE_PREFIX + "foo"))
}
}
}