aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorwangzhenhua <wangzhenhua@huawei.com>2016-09-05 17:32:31 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-09-05 17:32:31 +0200
commit6d86403d8b252776effcddd71338b4d21a224f9b (patch)
tree2fa41086f7d6c9dacb86d3e34d9c8f6f0b4fdcab /sql/core
parent3ccb23e445711ea5d9059eb6de7c490c8fc9d112 (diff)
downloadspark-6d86403d8b252776effcddd71338b4d21a224f9b.tar.gz
spark-6d86403d8b252776effcddd71338b4d21a224f9b.tar.bz2
spark-6d86403d8b252776effcddd71338b4d21a224f9b.zip
[SPARK-17072][SQL] support table-level statistics generation and storing into/loading from metastore
## What changes were proposed in this pull request? 1. Support generation table-level statistics for - hive tables in HiveExternalCatalog - data source tables in HiveExternalCatalog - data source tables in InMemoryCatalog. 2. Add a property "catalogStats" in CatalogTable to hold statistics in Spark side. 3. Put logics of statistics transformation between Spark and Hive in HiveClientImpl. 4. Extend Statistics class by adding rowCount (will add estimatedSize when we have column stats). ## How was this patch tested? add unit tests Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wangzhenhua@huawei.com> Closes #14712 from wzhfy/tableStats.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala26
9 files changed, 94 insertions, 43 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
index dde91b0a86..6f821f80cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
@@ -590,8 +590,12 @@ class SQLBuilder private (
object ExtractSQLTable {
def unapply(plan: LogicalPlan): Option[SQLTable] = plan match {
- case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
- Some(SQLTable(database, table, l.output.map(_.withQualifier(None))))
+ case l @ LogicalRelation(_, _, Some(catalogTable))
+ if catalogTable.identifier.database.isDefined =>
+ Some(SQLTable(
+ catalogTable.identifier.database.get,
+ catalogTable.identifier.table,
+ l.output.map(_.withQualifier(None))))
case relation: CatalogRelation =>
val m = relation.catalogTable
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fc078da07d..7ba1a9ff22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -99,9 +99,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.identifier.getText.toLowerCase == "noscan") {
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
} else {
- // Always just run the no scan analyze. We should fix this and implement full analyze
- // command in the future.
- AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
+ AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString, noscan = false)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index a469d4da86..15687ddd72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -21,19 +21,18 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.execution.datasources.LogicalRelation
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
- *
- * Right now, it only supports Hive tables and it only updates the size of a Hive table
- * in the Hive metastore.
*/
-case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
+case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
@@ -71,8 +70,6 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
size
}
- val tableParameters = catalogTable.properties
- val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
val newTotalSize =
catalogTable.storage.locationUri.map { p =>
val path = new Path(p)
@@ -88,24 +85,47 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
}
}.getOrElse(0L)
- // Update the Hive metastore if the total size of the table is different than the size
- // recorded in the Hive metastore.
- // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
- if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- sessionState.catalog.alterTable(
- catalogTable.copy(
- properties = relation.catalogTable.properties +
- (AnalyzeTableCommand.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
- }
+ updateTableStats(catalogTable, newTotalSize)
+
+ // data source tables have been converted into LogicalRelations
+ case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
+ updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
case otherRelation =>
- throw new AnalysisException(s"ANALYZE TABLE is only supported for Hive tables, " +
- s"but '${tableIdent.unquotedString}' is a ${otherRelation.nodeName}.")
+ throw new AnalysisException(s"ANALYZE TABLE is not supported for " +
+ s"${otherRelation.nodeName}.")
}
+
+ def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
+ val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
+ val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+ var newStats: Option[Statistics] = None
+ if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ newStats = Some(Statistics(sizeInBytes = newTotalSize))
+ }
+ // We only set rowCount when noscan is false, because otherwise:
+ // 1. when total size is not changed, we don't need to alter the table;
+ // 2. when total size is changed, `oldRowCount` becomes invalid.
+ // This is to make sure that we only record the right statistics.
+ if (!noscan) {
+ val newRowCount = Dataset.ofRows(sparkSession, relation).count()
+ if (newRowCount >= 0 && newRowCount != oldRowCount) {
+ newStats = if (newStats.isDefined) {
+ newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
+ } else {
+ Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
+ }
+ }
+ }
+ // Update the metastore if the above statistics of the table are different from those
+ // recorded in the metastore.
+ if (newStats.isDefined) {
+ sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
+ // Refresh the cached data source table in the catalog.
+ sessionState.catalog.refreshTable(tableIdent)
+ }
+ }
+
Seq.empty[Row]
}
}
-
-object AnalyzeTableCommand {
- val TOTAL_SIZE_FIELD = "totalSize"
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8286467e96..c8ad5b3034 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -209,7 +209,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
LogicalRelation(
dataSource.resolveRelation(),
- metastoreTableIdentifier = Some(table.identifier))
+ catalogTable = Some(table))
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -366,7 +366,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
+ relation.relation, UnknownPartitioning(0), metadata,
+ relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
@@ -376,7 +377,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
+ relation.relation, UnknownPartitioning(0), metadata,
+ relation.catalogTable.map(_.identifier))
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 8b36caf6f1..55ca4f1106 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -112,7 +112,7 @@ object FileSourceStrategy extends Strategy with Logging {
outputSchema,
partitionKeyFilters.toSeq,
pushedDownFilters,
- table)
+ table.map(_.identifier))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 2a8e147011..d9562fd32e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
@@ -33,7 +33,7 @@ import org.apache.spark.util.Utils
case class LogicalRelation(
relation: BaseRelation,
expectedOutputAttributes: Option[Seq[Attribute]] = None,
- metastoreTableIdentifier: Option[TableIdentifier] = None)
+ catalogTable: Option[CatalogTable] = None)
extends LeafNode with MultiInstanceRelation {
override val output: Seq[AttributeReference] = {
@@ -72,9 +72,10 @@ case class LogicalRelation(
// expId can be different but the relation is still the same.
override lazy val cleanArgs: Seq[Any] = Seq(relation)
- @transient override lazy val statistics: Statistics = Statistics(
- sizeInBytes = BigInt(relation.sizeInBytes)
- )
+ @transient override lazy val statistics: Statistics = {
+ catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse(
+ Statistics(sizeInBytes = relation.sizeInBytes))
+ }
/** Used to lookup original attribute capitalization */
val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o)))
@@ -89,7 +90,7 @@ case class LogicalRelation(
LogicalRelation(
relation,
expectedOutputAttributes.map(_.map(_.newInstance())),
- metastoreTableIdentifier).asInstanceOf[this.type]
+ catalogTable).asInstanceOf[this.type]
}
override def refresh(): Unit = relation match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index ae77e4cb96..5b96206ba8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -252,11 +252,11 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
case relation: CatalogRelation =>
val metadata = relation.catalogTable
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
- case LogicalRelation(h: HadoopFsRelation, _, identifier) =>
- val tblName = identifier.map(_.quotedString).getOrElse("unknown")
+ case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
+ val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, h.partitionSchema.map(_.name))
- case LogicalRelation(_: InsertableRelation, _, identifier) =>
- val tblName = identifier.map(_.quotedString).getOrElse("unknown")
+ case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
+ val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
case other => i
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index ab27381c06..8fdbd0f2c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -192,7 +192,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
* Right now, it only supports catalog tables and it only updates the size of a catalog table
* in the external catalog.
*/
- def analyze(tableName: String): Unit = {
- AnalyzeTableCommand(tableName).run(sparkSession)
+ def analyze(tableName: String, noscan: Boolean = true): Unit = {
+ AnalyzeTableCommand(tableName, noscan).run(sparkSession)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
index 2c81cbf15f..264a2ffbeb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -75,4 +76,29 @@ class StatisticsSuite extends QueryTest with SharedSQLContext {
}
}
+ test("test table-level statistics for data source table created in InMemoryCatalog") {
+ def checkTableStats(tableName: String, expectedRowCount: Option[BigInt]): Unit = {
+ val df = sql(s"SELECT * FROM $tableName")
+ val relations = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
+ assert(rel.catalogTable.isDefined)
+ assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
+ rel
+ }
+ assert(relations.size === 1)
+ }
+
+ val tableName = "tbl"
+ withTable(tableName) {
+ sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet")
+ Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
+
+ // noscan won't count the number of rows
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
+ checkTableStats(tableName, expectedRowCount = None)
+
+ // without noscan, we count the number of rows
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
+ checkTableStats(tableName, expectedRowCount = Some(2))
+ }
+ }
}