diff options
author | Zhenhua Wang <wzh_zju@163.com> | 2016-10-03 10:12:02 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-10-03 10:12:02 -0700 |
commit | 7bf92127643570e4eb3610fa3ffd36839eba2718 (patch) | |
tree | 14386f49f956e97b50a8d6b2bbf0f776eab4dd39 /sql/hive | |
parent | a27033c0bbaae8f31db9b91693947ed71738ed11 (diff) | |
download | spark-7bf92127643570e4eb3610fa3ffd36839eba2718.tar.gz spark-7bf92127643570e4eb3610fa3ffd36839eba2718.tar.bz2 spark-7bf92127643570e4eb3610fa3ffd36839eba2718.zip |
[SPARK-17073][SQL] generate column-level statistics
## What changes were proposed in this pull request?
Generate basic column statistics for all the atomic types:
- numeric types: max, min, num of nulls, ndv (number of distinct values)
- date/timestamp types: they are also represented as numbers internally, so they have the same stats as above.
- string: avg length, max length, num of nulls, ndv
- binary: avg length, max length, num of nulls
- boolean: num of nulls, num of trues, num of falsies
Also support storing and loading these statistics.
One thing to notice:
We support analyzing columns independently, e.g.:
sql1: `ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS key;`
sql2: `ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS value;`
when running sql2 to collect column stats for `value`, we don’t remove stats of columns `key` which are analyzed in sql1 and not in sql2. As a result, **users need to guarantee consistency** between sql1 and sql2. If the table has been changed before sql2, users should re-analyze column `key` when they want to analyze column `value`:
`ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS key, value;`
## How was this patch tested?
add unit tests
Author: Zhenhua Wang <wzh_zju@163.com>
Closes #15090 from wzhfy/colStats.
Diffstat (limited to 'sql/hive')
3 files changed, 113 insertions, 35 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index d35a681b67..261cc6feff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -32,8 +32,8 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.Statistics -import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} +import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils} import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe @@ -401,7 +401,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat var statsProperties: Map[String, String] = Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()) if (stats.rowCount.isDefined) { - statsProperties += (STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()) + statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() + } + stats.colStats.foreach { case (colName, colStat) => + statsProperties += (STATISTICS_COL_STATS_PREFIX + colName) -> colStat.toString } tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties) } else { @@ -473,15 +476,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } // construct Spark's statistics from information in Hive metastore - if (catalogTable.properties.contains(STATISTICS_TOTAL_SIZE)) { - val totalSize = BigInt(catalogTable.properties.get(STATISTICS_TOTAL_SIZE).get) - // TODO: we will compute "estimatedSize" when we have column stats: - // average size of row * number of rows + val statsProps = catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) + if (statsProps.nonEmpty) { + val colStatsProps = statsProps.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)) + .map { case (k, v) => (k.drop(STATISTICS_COL_STATS_PREFIX.length), v) } + val colStats: Map[String, ColumnStat] = catalogTable.schema.collect { + case f if colStatsProps.contains(f.name) => + val numFields = ColumnStatStruct.numStatFields(f.dataType) + (f.name, ColumnStat(numFields, colStatsProps(f.name))) + }.toMap catalogTable.copy( properties = removeStatsProperties(catalogTable), stats = Some(Statistics( - sizeInBytes = totalSize, - rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_))))) + sizeInBytes = BigInt(catalogTable.properties(STATISTICS_TOTAL_SIZE)), + rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)), + colStats = colStats))) } else { catalogTable } @@ -693,6 +702,7 @@ object HiveExternalCatalog { val STATISTICS_PREFIX = "spark.sql.statistics." val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize" val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows" + val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats." def removeStatsProperties(metadata: CatalogTable): Map[String, String] = { metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 9956706929..99dd080683 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -21,16 +21,16 @@ import java.io.{File, PrintWriter} import scala.reflect.ClassTag -import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, StatisticsTest} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} import org.apache.spark.sql.execution.command.{AnalyzeTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { @@ -171,7 +171,27 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false) } - private def checkStats( + test("analyzing views is not supported") { + def assertAnalyzeUnsupported(analyzeCommand: String): Unit = { + val err = intercept[AnalysisException] { + sql(analyzeCommand) + } + assert(err.message.contains("ANALYZE TABLE is not supported")) + } + + val tableName = "tbl" + withTable(tableName) { + spark.range(10).write.saveAsTable(tableName) + val viewName = "view" + withView(viewName) { + sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName") + assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") + assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") + } + } + } + + private def checkTableStats( stats: Option[Statistics], hasSizeInBytes: Boolean, expectedRowCounts: Option[Int]): Unit = { @@ -184,7 +204,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } - private def checkStats( + private def checkTableStats( tableName: String, isDataSourceTable: Boolean, hasSizeInBytes: Boolean, @@ -192,12 +212,12 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils val df = sql(s"SELECT * FROM $tableName") val stats = df.queryExecution.analyzed.collect { case rel: MetastoreRelation => - checkStats(rel.catalogTable.stats, hasSizeInBytes, expectedRowCounts) - assert(!isDataSourceTable, "Expected a data source table, but got a Hive serde table") + checkTableStats(rel.catalogTable.stats, hasSizeInBytes, expectedRowCounts) + assert(!isDataSourceTable, "Expected a Hive serde table, but got a data source table") rel.catalogTable.stats case rel: LogicalRelation => - checkStats(rel.catalogTable.get.stats, hasSizeInBytes, expectedRowCounts) - assert(isDataSourceTable, "Expected a Hive serde table, but got a data source table") + checkTableStats(rel.catalogTable.get.stats, hasSizeInBytes, expectedRowCounts) + assert(isDataSourceTable, "Expected a data source table, but got a Hive serde table") rel.catalogTable.get.stats } assert(stats.size == 1) @@ -210,13 +230,13 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils // Currently Spark's statistics are self-contained, we don't have statistics until we use // the `ANALYZE TABLE` command. sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE") - checkStats( + checkTableStats( textTable, isDataSourceTable = false, hasSizeInBytes = false, expectedRowCounts = None) sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") - checkStats( + checkTableStats( textTable, isDataSourceTable = false, hasSizeInBytes = false, @@ -224,12 +244,12 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils // noscan won't count the number of rows sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") - val fetchedStats1 = checkStats( + val fetchedStats1 = checkTableStats( textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = None) // without noscan, we count the number of rows sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS") - val fetchedStats2 = checkStats( + val fetchedStats2 = checkTableStats( textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500)) assert(fetchedStats1.get.sizeInBytes == fetchedStats2.get.sizeInBytes) } @@ -241,19 +261,19 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE") sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS") - val fetchedStats1 = checkStats( + val fetchedStats1 = checkTableStats( textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500)) sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") // when the total size is not changed, the old row count is kept - val fetchedStats2 = checkStats( + val fetchedStats2 = checkTableStats( textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500)) assert(fetchedStats1 == fetchedStats2) sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") // update total size and remove the old and invalid row count - val fetchedStats3 = checkStats( + val fetchedStats3 = checkTableStats( textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = None) assert(fetchedStats3.get.sizeInBytes > fetchedStats2.get.sizeInBytes) } @@ -271,20 +291,20 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils // the default value for `spark.sql.hive.convertMetastoreParquet` is true, here we just set it // for robustness withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") { - checkStats( + checkTableStats( parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None) sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS") - checkStats( + checkTableStats( parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = Some(500)) } withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") { - checkStats( + checkTableStats( orcTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None) sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS") - checkStats( + checkTableStats( orcTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = Some(500)) } } @@ -298,23 +318,23 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils assert(DDLUtils.isDatasourceTable(catalogTable)) sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src") - checkStats( + checkTableStats( parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None) // noscan won't count the number of rows sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan") - val fetchedStats1 = checkStats( + val fetchedStats1 = checkTableStats( parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None) sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src") sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan") - val fetchedStats2 = checkStats( + val fetchedStats2 = checkTableStats( parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None) assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes) // without noscan, we count the number of rows sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS") - val fetchedStats3 = checkStats( + val fetchedStats3 = checkTableStats( parquetTable, isDataSourceTable = true, hasSizeInBytes = true, @@ -330,7 +350,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty)) dfNoCols.write.format("json").saveAsTable(table_no_cols) sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS") - checkStats( + checkTableStats( table_no_cols, isDataSourceTable = true, hasSizeInBytes = true, @@ -338,6 +358,53 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } + test("generate column-level statistics and load them from hive metastore") { + import testImplicits._ + + val intSeq = Seq(1, 2) + val stringSeq = Seq("a", "bb") + val booleanSeq = Seq(true, false) + + val data = intSeq.indices.map { i => + (intSeq(i), stringSeq(i), booleanSeq(i)) + } + val tableName = "table" + withTable(tableName) { + val df = data.toDF("c1", "c2", "c3") + df.write.format("parquet").saveAsTable(tableName) + val expectedColStatsSeq = df.schema.map { f => + val colStat = f.dataType match { + case IntegerType => + ColumnStat(InternalRow(0L, intSeq.max, intSeq.min, intSeq.distinct.length.toLong)) + case StringType => + ColumnStat(InternalRow(0L, stringSeq.map(_.length).sum / stringSeq.length.toDouble, + stringSeq.map(_.length).max.toLong, stringSeq.distinct.length.toLong)) + case BooleanType => + ColumnStat(InternalRow(0L, booleanSeq.count(_.equals(true)).toLong, + booleanSeq.count(_.equals(false)).toLong)) + } + (f, colStat) + } + + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1, c2, c3") + val readback = spark.table(tableName) + val relations = readback.queryExecution.analyzed.collect { case rel: LogicalRelation => + val columnStats = rel.catalogTable.get.stats.get.colStats + expectedColStatsSeq.foreach { case (field, expectedColStat) => + assert(columnStats.contains(field.name)) + val colStat = columnStats(field.name) + StatisticsTest.checkColStat( + dataType = field.dataType, + colStat = colStat, + expectedColStat = expectedColStat, + rsd = spark.sessionState.conf.ndvMaxError) + } + rel + } + assert(relations.size == 1) + } + } + test("estimates the size of a test MetastoreRelation") { val df = sql("""SELECT * FROM src""") val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index a215c70da0..f5c605fe5e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -123,6 +123,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { assertNoSuchTable(s"SHOW CREATE TABLE $viewName") assertNoSuchTable(s"SHOW PARTITIONS $viewName") assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") + assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") } } |