From 7486442fe0b70f2aea21d569604e71d7ddf19a77 Mon Sep 17 00:00:00 2001 From: wangzhenhua Date: Fri, 14 Oct 2016 21:18:49 +0800 Subject: [SPARK-17073][SQL][FOLLOWUP] generate column-level statistics ## What changes were proposed in this pull request? This pr adds some test cases for statistics: case sensitive column names, non ascii column names, refresh table, and also improves some documentation. ## How was this patch tested? add test cases Author: wangzhenhua Closes #15360 from wzhfy/colStats2. --- .../apache/spark/sql/hive/StatisticsSuite.scala | 198 +++++++++++++++++---- 1 file changed, 166 insertions(+), 32 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 85228bb001..c351063a63 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -21,7 +21,7 @@ import java.io.{File, PrintWriter} import scala.reflect.ClassTag -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, StatisticsTest} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} import org.apache.spark.sql.execution.command.{AnalyzeTableCommand, DDLUtils} @@ -358,53 +358,187 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } - test("generate column-level statistics and load them from hive metastore") { + private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean): (Statistics, Statistics) = { + val tableName = "tbl" + var statsBeforeUpdate: Statistics = null + var statsAfterUpdate: Statistics = null + withTable(tableName) { + val tableIndent = TableIdentifier(tableName, Some("default")) + val catalog = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog] + sql(s"CREATE TABLE $tableName (key int) USING PARQUET") + sql(s"INSERT INTO $tableName SELECT 1") + if (isAnalyzeColumns) { + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS key") + } else { + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") + } + // Table lookup will make the table cached. + catalog.lookupRelation(tableIndent) + statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent) + .asInstanceOf[LogicalRelation].catalogTable.get.stats.get + + sql(s"INSERT INTO $tableName SELECT 2") + if (isAnalyzeColumns) { + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS key") + } else { + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") + } + catalog.lookupRelation(tableIndent) + statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent) + .asInstanceOf[LogicalRelation].catalogTable.get.stats.get + } + (statsBeforeUpdate, statsAfterUpdate) + } + + test("test refreshing table stats of cached data source table by `ANALYZE TABLE` statement") { + val (statsBeforeUpdate, statsAfterUpdate) = getStatsBeforeAfterUpdate(isAnalyzeColumns = false) + + assert(statsBeforeUpdate.sizeInBytes > 0) + assert(statsBeforeUpdate.rowCount == Some(1)) + + assert(statsAfterUpdate.sizeInBytes > statsBeforeUpdate.sizeInBytes) + assert(statsAfterUpdate.rowCount == Some(2)) + } + + test("test refreshing column stats of cached data source table by `ANALYZE TABLE` statement") { + val (statsBeforeUpdate, statsAfterUpdate) = getStatsBeforeAfterUpdate(isAnalyzeColumns = true) + + assert(statsBeforeUpdate.sizeInBytes > 0) + assert(statsBeforeUpdate.rowCount == Some(1)) + StatisticsTest.checkColStat( + dataType = IntegerType, + colStat = statsBeforeUpdate.colStats("key"), + expectedColStat = ColumnStat(InternalRow(0L, 1, 1, 1L)), + rsd = spark.sessionState.conf.ndvMaxError) + + assert(statsAfterUpdate.sizeInBytes > statsBeforeUpdate.sizeInBytes) + assert(statsAfterUpdate.rowCount == Some(2)) + StatisticsTest.checkColStat( + dataType = IntegerType, + colStat = statsAfterUpdate.colStats("key"), + expectedColStat = ColumnStat(InternalRow(0L, 2, 1, 2L)), + rsd = spark.sessionState.conf.ndvMaxError) + } + + private lazy val (testDataFrame, expectedColStatsSeq) = { import testImplicits._ val intSeq = Seq(1, 2) val stringSeq = Seq("a", "bb") + val binarySeq = Seq("a", "bb").map(_.getBytes) val booleanSeq = Seq(true, false) - val data = intSeq.indices.map { i => - (intSeq(i), stringSeq(i), booleanSeq(i)) + (intSeq(i), stringSeq(i), binarySeq(i), booleanSeq(i)) } - val tableName = "table" - withTable(tableName) { - val df = data.toDF("c1", "c2", "c3") - df.write.format("parquet").saveAsTable(tableName) - val expectedColStatsSeq = df.schema.map { f => - val colStat = f.dataType match { - case IntegerType => - ColumnStat(InternalRow(0L, intSeq.max, intSeq.min, intSeq.distinct.length.toLong)) - case StringType => - ColumnStat(InternalRow(0L, stringSeq.map(_.length).sum / stringSeq.length.toDouble, - stringSeq.map(_.length).max.toInt, stringSeq.distinct.length.toLong)) - case BooleanType => - ColumnStat(InternalRow(0L, booleanSeq.count(_.equals(true)).toLong, - booleanSeq.count(_.equals(false)).toLong)) - } - (f, colStat) + val df: DataFrame = data.toDF("c1", "c2", "c3", "c4") + val expectedColStatsSeq: Seq[(StructField, ColumnStat)] = df.schema.map { f => + val colStat = f.dataType match { + case IntegerType => + ColumnStat(InternalRow(0L, intSeq.max, intSeq.min, intSeq.distinct.length.toLong)) + case StringType => + ColumnStat(InternalRow(0L, stringSeq.map(_.length).sum / stringSeq.length.toDouble, + stringSeq.map(_.length).max.toInt, stringSeq.distinct.length.toLong)) + case BinaryType => + ColumnStat(InternalRow(0L, binarySeq.map(_.length).sum / binarySeq.length.toDouble, + binarySeq.map(_.length).max.toInt)) + case BooleanType => + ColumnStat(InternalRow(0L, booleanSeq.count(_.equals(true)).toLong, + booleanSeq.count(_.equals(false)).toLong)) } + (f, colStat) + } + (df, expectedColStatsSeq) + } + + private def checkColStats( + tableName: String, + isDataSourceTable: Boolean, + expectedColStatsSeq: Seq[(StructField, ColumnStat)]): Unit = { + val readback = spark.table(tableName) + val stats = readback.queryExecution.analyzed.collect { + case rel: MetastoreRelation => + assert(!isDataSourceTable, "Expected a Hive serde table, but got a data source table") + rel.catalogTable.stats.get + case rel: LogicalRelation => + assert(isDataSourceTable, "Expected a data source table, but got a Hive serde table") + rel.catalogTable.get.stats.get + } + assert(stats.length == 1) + val columnStats = stats.head.colStats + assert(columnStats.size == expectedColStatsSeq.length) + expectedColStatsSeq.foreach { case (field, expectedColStat) => + StatisticsTest.checkColStat( + dataType = field.dataType, + colStat = columnStats(field.name), + expectedColStat = expectedColStat, + rsd = spark.sessionState.conf.ndvMaxError) + } + } - sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1, c2, c3") - val readback = spark.table(tableName) - val relations = readback.queryExecution.analyzed.collect { case rel: LogicalRelation => - val columnStats = rel.catalogTable.get.stats.get.colStats - expectedColStatsSeq.foreach { case (field, expectedColStat) => - assert(columnStats.contains(field.name)) - val colStat = columnStats(field.name) + test("generate and load column-level stats for data source table") { + val dsTable = "dsTable" + withTable(dsTable) { + testDataFrame.write.format("parquet").saveAsTable(dsTable) + sql(s"ANALYZE TABLE $dsTable COMPUTE STATISTICS FOR COLUMNS c1, c2, c3, c4") + checkColStats(dsTable, isDataSourceTable = true, expectedColStatsSeq) + } + } + + test("generate and load column-level stats for hive serde table") { + val hTable = "hTable" + val tmp = "tmp" + withTable(hTable, tmp) { + testDataFrame.write.format("parquet").saveAsTable(tmp) + sql(s"CREATE TABLE $hTable (c1 int, c2 string, c3 binary, c4 boolean) STORED AS TEXTFILE") + sql(s"INSERT INTO $hTable SELECT * FROM $tmp") + sql(s"ANALYZE TABLE $hTable COMPUTE STATISTICS FOR COLUMNS c1, c2, c3, c4") + checkColStats(hTable, isDataSourceTable = false, expectedColStatsSeq) + } + } + + // When caseSensitive is on, for columns with only case difference, they are different columns + // and we should generate column stats for all of them. + private def checkCaseSensitiveColStats(columnName: String): Unit = { + val tableName = "tbl" + withTable(tableName) { + val column1 = columnName.toLowerCase + val column2 = columnName.toUpperCase + withSQLConf("spark.sql.caseSensitive" -> "true") { + sql(s"CREATE TABLE $tableName (`$column1` int, `$column2` double) USING PARQUET") + sql(s"INSERT INTO $tableName SELECT 1, 3.0") + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS `$column1`, `$column2`") + val readback = spark.table(tableName) + val relations = readback.queryExecution.analyzed.collect { case rel: LogicalRelation => + val columnStats = rel.catalogTable.get.stats.get.colStats + assert(columnStats.size == 2) + StatisticsTest.checkColStat( + dataType = IntegerType, + colStat = columnStats(column1), + expectedColStat = ColumnStat(InternalRow(0L, 1, 1, 1L)), + rsd = spark.sessionState.conf.ndvMaxError) StatisticsTest.checkColStat( - dataType = field.dataType, - colStat = colStat, - expectedColStat = expectedColStat, + dataType = DoubleType, + colStat = columnStats(column2), + expectedColStat = ColumnStat(InternalRow(0L, 3.0d, 3.0d, 1L)), rsd = spark.sessionState.conf.ndvMaxError) + rel } - rel + assert(relations.size == 1) } - assert(relations.size == 1) } } + test("check column statistics for case sensitive column names") { + checkCaseSensitiveColStats(columnName = "c1") + } + + test("check column statistics for case sensitive non-ascii column names") { + // scalastyle:off + // non ascii characters are not allowed in the source code, so we disable the scalastyle. + checkCaseSensitiveColStats(columnName = "列c") + // scalastyle:on + } + test("estimates the size of a test MetastoreRelation") { val df = sql("""SELECT * FROM src""") val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation => -- cgit v1.2.3