From 6d86403d8b252776effcddd71338b4d21a224f9b Mon Sep 17 00:00:00 2001 From: wangzhenhua Date: Mon, 5 Sep 2016 17:32:31 +0200 Subject: [SPARK-17072][SQL] support table-level statistics generation and storing into/loading from metastore ## What changes were proposed in this pull request? 1. Support generation table-level statistics for - hive tables in HiveExternalCatalog - data source tables in HiveExternalCatalog - data source tables in InMemoryCatalog. 2. Add a property "catalogStats" in CatalogTable to hold statistics in Spark side. 3. Put logics of statistics transformation between Spark and Hive in HiveClientImpl. 4. Extend Statistics class by adding rowCount (will add estimatedSize when we have column stats). ## How was this patch tested? add unit tests Author: wangzhenhua Author: Zhenhua Wang Closes #14712 from wzhfy/tableStats. --- .../org/apache/spark/sql/catalyst/SQLBuilder.scala | 8 ++- .../spark/sql/execution/SparkSqlParser.scala | 4 +- .../execution/command/AnalyzeTableCommand.scala | 64 ++++++++++++++-------- .../execution/datasources/DataSourceStrategy.scala | 8 ++- .../execution/datasources/FileSourceStrategy.scala | 2 +- .../execution/datasources/LogicalRelation.scala | 13 +++-- .../spark/sql/execution/datasources/rules.scala | 8 +-- .../apache/spark/sql/internal/SessionState.scala | 4 +- .../org/apache/spark/sql/StatisticsSuite.scala | 26 +++++++++ 9 files changed, 94 insertions(+), 43 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala index dde91b0a86..6f821f80cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala @@ -590,8 +590,12 @@ class SQLBuilder private ( object ExtractSQLTable { def unapply(plan: LogicalPlan): Option[SQLTable] = plan match { - case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) => - Some(SQLTable(database, table, l.output.map(_.withQualifier(None)))) + case l @ LogicalRelation(_, _, Some(catalogTable)) + if catalogTable.identifier.database.isDefined => + Some(SQLTable( + catalogTable.identifier.database.get, + catalogTable.identifier.table, + l.output.map(_.withQualifier(None)))) case relation: CatalogRelation => val m = relation.catalogTable diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index fc078da07d..7ba1a9ff22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -99,9 +99,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ctx.identifier.getText.toLowerCase == "noscan") { AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString) } else { - // Always just run the no scan analyze. We should fix this and implement full analyze - // command in the future. - AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString) + AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString, noscan = false) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index a469d4da86..15687ddd72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -21,19 +21,18 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.execution.datasources.LogicalRelation /** * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. - * - * Right now, it only supports Hive tables and it only updates the size of a Hive table - * in the Hive metastore. */ -case class AnalyzeTableCommand(tableName: String) extends RunnableCommand { +case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val sessionState = sparkSession.sessionState @@ -71,8 +70,6 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand { size } - val tableParameters = catalogTable.properties - val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L) val newTotalSize = catalogTable.storage.locationUri.map { p => val path = new Path(p) @@ -88,24 +85,47 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand { } }.getOrElse(0L) - // Update the Hive metastore if the total size of the table is different than the size - // recorded in the Hive metastore. - // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). - if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - sessionState.catalog.alterTable( - catalogTable.copy( - properties = relation.catalogTable.properties + - (AnalyzeTableCommand.TOTAL_SIZE_FIELD -> newTotalSize.toString))) - } + updateTableStats(catalogTable, newTotalSize) + + // data source tables have been converted into LogicalRelations + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => + updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) case otherRelation => - throw new AnalysisException(s"ANALYZE TABLE is only supported for Hive tables, " + - s"but '${tableIdent.unquotedString}' is a ${otherRelation.nodeName}.") + throw new AnalysisException(s"ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") } + + def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L) + val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) + var newStats: Option[Statistics] = None + if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + newStats = Some(Statistics(sizeInBytes = newTotalSize)) + } + // We only set rowCount when noscan is false, because otherwise: + // 1. when total size is not changed, we don't need to alter the table; + // 2. when total size is changed, `oldRowCount` becomes invalid. + // This is to make sure that we only record the right statistics. + if (!noscan) { + val newRowCount = Dataset.ofRows(sparkSession, relation).count() + if (newRowCount >= 0 && newRowCount != oldRowCount) { + newStats = if (newStats.isDefined) { + newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) + } else { + Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) + } + } + } + // Update the metastore if the above statistics of the table are different from those + // recorded in the metastore. + if (newStats.isDefined) { + sessionState.catalog.alterTable(catalogTable.copy(stats = newStats)) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdent) + } + } + Seq.empty[Row] } } - -object AnalyzeTableCommand { - val TOTAL_SIZE_FIELD = "totalSize" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 8286467e96..c8ad5b3034 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -209,7 +209,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] LogicalRelation( dataSource.resolveRelation(), - metastoreTableIdentifier = Some(table.identifier)) + catalogTable = Some(table)) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -366,7 +366,8 @@ object DataSourceStrategy extends Strategy with Logging { val scan = RowDataSourceScanExec( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier) + relation.relation, UnknownPartitioning(0), metadata, + relation.catalogTable.map(_.identifier)) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -376,7 +377,8 @@ object DataSourceStrategy extends Strategy with Logging { val scan = RowDataSourceScanExec( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier) + relation.relation, UnknownPartitioning(0), metadata, + relation.catalogTable.map(_.identifier)) execution.ProjectExec( projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 8b36caf6f1..55ca4f1106 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -112,7 +112,7 @@ object FileSourceStrategy extends Strategy with Logging { outputSchema, partitionKeyFilters.toSeq, pushedDownFilters, - table) + table.map(_.identifier)) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 2a8e147011..d9562fd32e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -16,8 +16,8 @@ */ package org.apache.spark.sql.execution.datasources -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.BaseRelation @@ -33,7 +33,7 @@ import org.apache.spark.util.Utils case class LogicalRelation( relation: BaseRelation, expectedOutputAttributes: Option[Seq[Attribute]] = None, - metastoreTableIdentifier: Option[TableIdentifier] = None) + catalogTable: Option[CatalogTable] = None) extends LeafNode with MultiInstanceRelation { override val output: Seq[AttributeReference] = { @@ -72,9 +72,10 @@ case class LogicalRelation( // expId can be different but the relation is still the same. override lazy val cleanArgs: Seq[Any] = Seq(relation) - @transient override lazy val statistics: Statistics = Statistics( - sizeInBytes = BigInt(relation.sizeInBytes) - ) + @transient override lazy val statistics: Statistics = { + catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse( + Statistics(sizeInBytes = relation.sizeInBytes)) + } /** Used to lookup original attribute capitalization */ val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o))) @@ -89,7 +90,7 @@ case class LogicalRelation( LogicalRelation( relation, expectedOutputAttributes.map(_.map(_.newInstance())), - metastoreTableIdentifier).asInstanceOf[this.type] + catalogTable).asInstanceOf[this.type] } override def refresh(): Unit = relation match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index ae77e4cb96..5b96206ba8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -252,11 +252,11 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { case relation: CatalogRelation => val metadata = relation.catalogTable preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames) - case LogicalRelation(h: HadoopFsRelation, _, identifier) => - val tblName = identifier.map(_.quotedString).getOrElse("unknown") + case LogicalRelation(h: HadoopFsRelation, _, catalogTable) => + val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") preprocess(i, tblName, h.partitionSchema.map(_.name)) - case LogicalRelation(_: InsertableRelation, _, identifier) => - val tblName = identifier.map(_.quotedString).getOrElse("unknown") + case LogicalRelation(_: InsertableRelation, _, catalogTable) => + val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") preprocess(i, tblName, Nil) case other => i } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index ab27381c06..8fdbd0f2c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -192,7 +192,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { * Right now, it only supports catalog tables and it only updates the size of a catalog table * in the external catalog. */ - def analyze(tableName: String): Unit = { - AnalyzeTableCommand(tableName).run(sparkSession) + def analyze(tableName: String, noscan: Boolean = true): Unit = { + AnalyzeTableCommand(tableName, noscan).run(sparkSession) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala index 2c81cbf15f..264a2ffbeb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -75,4 +76,29 @@ class StatisticsSuite extends QueryTest with SharedSQLContext { } } + test("test table-level statistics for data source table created in InMemoryCatalog") { + def checkTableStats(tableName: String, expectedRowCount: Option[BigInt]): Unit = { + val df = sql(s"SELECT * FROM $tableName") + val relations = df.queryExecution.analyzed.collect { case rel: LogicalRelation => + assert(rel.catalogTable.isDefined) + assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount) + rel + } + assert(relations.size === 1) + } + + val tableName = "tbl" + withTable(tableName) { + sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet") + Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl") + + // noscan won't count the number of rows + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan") + checkTableStats(tableName, expectedRowCount = None) + + // without noscan, we count the number of rows + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") + checkTableStats(tableName, expectedRowCount = Some(2)) + } + } } -- cgit v1.2.3