aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorZhenhua Wang <wzh_zju@163.com>2016-10-03 10:12:02 -0700
committerReynold Xin <rxin@databricks.com>2016-10-03 10:12:02 -0700
commit7bf92127643570e4eb3610fa3ffd36839eba2718 (patch)
tree14386f49f956e97b50a8d6b2bbf0f776eab4dd39 /sql/hive
parenta27033c0bbaae8f31db9b91693947ed71738ed11 (diff)
downloadspark-7bf92127643570e4eb3610fa3ffd36839eba2718.tar.gz
spark-7bf92127643570e4eb3610fa3ffd36839eba2718.tar.bz2
spark-7bf92127643570e4eb3610fa3ffd36839eba2718.zip
[SPARK-17073][SQL] generate column-level statistics
## What changes were proposed in this pull request? Generate basic column statistics for all the atomic types: - numeric types: max, min, num of nulls, ndv (number of distinct values) - date/timestamp types: they are also represented as numbers internally, so they have the same stats as above. - string: avg length, max length, num of nulls, ndv - binary: avg length, max length, num of nulls - boolean: num of nulls, num of trues, num of falsies Also support storing and loading these statistics. One thing to notice: We support analyzing columns independently, e.g.: sql1: `ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS key;` sql2: `ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS value;` when running sql2 to collect column stats for `value`, we don’t remove stats of columns `key` which are analyzed in sql1 and not in sql2. As a result, **users need to guarantee consistency** between sql1 and sql2. If the table has been changed before sql2, users should re-analyze column `key` when they want to analyze column `value`: `ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS key, value;` ## How was this patch tested? add unit tests Author: Zhenhua Wang <wzh_zju@163.com> Closes #15090 from wzhfy/colStats.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala28
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala119
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala1
3 files changed, 113 insertions, 35 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index d35a681b67..261cc6feff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -32,8 +32,8 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
-import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
+import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
@@ -401,7 +401,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
var statsProperties: Map[String, String] =
Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
if (stats.rowCount.isDefined) {
- statsProperties += (STATISTICS_NUM_ROWS -> stats.rowCount.get.toString())
+ statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
+ }
+ stats.colStats.foreach { case (colName, colStat) =>
+ statsProperties += (STATISTICS_COL_STATS_PREFIX + colName) -> colStat.toString
}
tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties)
} else {
@@ -473,15 +476,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}
// construct Spark's statistics from information in Hive metastore
- if (catalogTable.properties.contains(STATISTICS_TOTAL_SIZE)) {
- val totalSize = BigInt(catalogTable.properties.get(STATISTICS_TOTAL_SIZE).get)
- // TODO: we will compute "estimatedSize" when we have column stats:
- // average size of row * number of rows
+ val statsProps = catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+ if (statsProps.nonEmpty) {
+ val colStatsProps = statsProps.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX))
+ .map { case (k, v) => (k.drop(STATISTICS_COL_STATS_PREFIX.length), v) }
+ val colStats: Map[String, ColumnStat] = catalogTable.schema.collect {
+ case f if colStatsProps.contains(f.name) =>
+ val numFields = ColumnStatStruct.numStatFields(f.dataType)
+ (f.name, ColumnStat(numFields, colStatsProps(f.name)))
+ }.toMap
catalogTable.copy(
properties = removeStatsProperties(catalogTable),
stats = Some(Statistics(
- sizeInBytes = totalSize,
- rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)))))
+ sizeInBytes = BigInt(catalogTable.properties(STATISTICS_TOTAL_SIZE)),
+ rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+ colStats = colStats)))
} else {
catalogTable
}
@@ -693,6 +702,7 @@ object HiveExternalCatalog {
val STATISTICS_PREFIX = "spark.sql.statistics."
val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
+ val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
def removeStatsProperties(metadata: CatalogTable): Map[String, String] = {
metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 9956706929..99dd080683 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,16 +21,16 @@ import java.io.{File, PrintWriter}
import scala.reflect.ClassTag
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, StatisticsTest}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
import org.apache.spark.sql.execution.command.{AnalyzeTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
@@ -171,7 +171,27 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
}
- private def checkStats(
+ test("analyzing views is not supported") {
+ def assertAnalyzeUnsupported(analyzeCommand: String): Unit = {
+ val err = intercept[AnalysisException] {
+ sql(analyzeCommand)
+ }
+ assert(err.message.contains("ANALYZE TABLE is not supported"))
+ }
+
+ val tableName = "tbl"
+ withTable(tableName) {
+ spark.range(10).write.saveAsTable(tableName)
+ val viewName = "view"
+ withView(viewName) {
+ sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName")
+ assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
+ assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id")
+ }
+ }
+ }
+
+ private def checkTableStats(
stats: Option[Statistics],
hasSizeInBytes: Boolean,
expectedRowCounts: Option[Int]): Unit = {
@@ -184,7 +204,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}
}
- private def checkStats(
+ private def checkTableStats(
tableName: String,
isDataSourceTable: Boolean,
hasSizeInBytes: Boolean,
@@ -192,12 +212,12 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
val df = sql(s"SELECT * FROM $tableName")
val stats = df.queryExecution.analyzed.collect {
case rel: MetastoreRelation =>
- checkStats(rel.catalogTable.stats, hasSizeInBytes, expectedRowCounts)
- assert(!isDataSourceTable, "Expected a data source table, but got a Hive serde table")
+ checkTableStats(rel.catalogTable.stats, hasSizeInBytes, expectedRowCounts)
+ assert(!isDataSourceTable, "Expected a Hive serde table, but got a data source table")
rel.catalogTable.stats
case rel: LogicalRelation =>
- checkStats(rel.catalogTable.get.stats, hasSizeInBytes, expectedRowCounts)
- assert(isDataSourceTable, "Expected a Hive serde table, but got a data source table")
+ checkTableStats(rel.catalogTable.get.stats, hasSizeInBytes, expectedRowCounts)
+ assert(isDataSourceTable, "Expected a data source table, but got a Hive serde table")
rel.catalogTable.get.stats
}
assert(stats.size == 1)
@@ -210,13 +230,13 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
// Currently Spark's statistics are self-contained, we don't have statistics until we use
// the `ANALYZE TABLE` command.
sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
- checkStats(
+ checkTableStats(
textTable,
isDataSourceTable = false,
hasSizeInBytes = false,
expectedRowCounts = None)
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
- checkStats(
+ checkTableStats(
textTable,
isDataSourceTable = false,
hasSizeInBytes = false,
@@ -224,12 +244,12 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
// noscan won't count the number of rows
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
- val fetchedStats1 = checkStats(
+ val fetchedStats1 = checkTableStats(
textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = None)
// without noscan, we count the number of rows
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
- val fetchedStats2 = checkStats(
+ val fetchedStats2 = checkTableStats(
textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500))
assert(fetchedStats1.get.sizeInBytes == fetchedStats2.get.sizeInBytes)
}
@@ -241,19 +261,19 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
- val fetchedStats1 = checkStats(
+ val fetchedStats1 = checkTableStats(
textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500))
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
// when the total size is not changed, the old row count is kept
- val fetchedStats2 = checkStats(
+ val fetchedStats2 = checkTableStats(
textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500))
assert(fetchedStats1 == fetchedStats2)
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
// update total size and remove the old and invalid row count
- val fetchedStats3 = checkStats(
+ val fetchedStats3 = checkTableStats(
textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetchedStats3.get.sizeInBytes > fetchedStats2.get.sizeInBytes)
}
@@ -271,20 +291,20 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
// the default value for `spark.sql.hive.convertMetastoreParquet` is true, here we just set it
// for robustness
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
- checkStats(
+ checkTableStats(
parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
- checkStats(
+ checkTableStats(
parquetTable,
isDataSourceTable = true,
hasSizeInBytes = true,
expectedRowCounts = Some(500))
}
withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") {
- checkStats(
+ checkTableStats(
orcTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS")
- checkStats(
+ checkTableStats(
orcTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = Some(500))
}
}
@@ -298,23 +318,23 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
assert(DDLUtils.isDatasourceTable(catalogTable))
sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
- checkStats(
+ checkTableStats(
parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
// noscan won't count the number of rows
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
- val fetchedStats1 = checkStats(
+ val fetchedStats1 = checkTableStats(
parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
- val fetchedStats2 = checkStats(
+ val fetchedStats2 = checkTableStats(
parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes)
// without noscan, we count the number of rows
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
- val fetchedStats3 = checkStats(
+ val fetchedStats3 = checkTableStats(
parquetTable,
isDataSourceTable = true,
hasSizeInBytes = true,
@@ -330,7 +350,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
dfNoCols.write.format("json").saveAsTable(table_no_cols)
sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS")
- checkStats(
+ checkTableStats(
table_no_cols,
isDataSourceTable = true,
hasSizeInBytes = true,
@@ -338,6 +358,53 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}
}
+ test("generate column-level statistics and load them from hive metastore") {
+ import testImplicits._
+
+ val intSeq = Seq(1, 2)
+ val stringSeq = Seq("a", "bb")
+ val booleanSeq = Seq(true, false)
+
+ val data = intSeq.indices.map { i =>
+ (intSeq(i), stringSeq(i), booleanSeq(i))
+ }
+ val tableName = "table"
+ withTable(tableName) {
+ val df = data.toDF("c1", "c2", "c3")
+ df.write.format("parquet").saveAsTable(tableName)
+ val expectedColStatsSeq = df.schema.map { f =>
+ val colStat = f.dataType match {
+ case IntegerType =>
+ ColumnStat(InternalRow(0L, intSeq.max, intSeq.min, intSeq.distinct.length.toLong))
+ case StringType =>
+ ColumnStat(InternalRow(0L, stringSeq.map(_.length).sum / stringSeq.length.toDouble,
+ stringSeq.map(_.length).max.toLong, stringSeq.distinct.length.toLong))
+ case BooleanType =>
+ ColumnStat(InternalRow(0L, booleanSeq.count(_.equals(true)).toLong,
+ booleanSeq.count(_.equals(false)).toLong))
+ }
+ (f, colStat)
+ }
+
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1, c2, c3")
+ val readback = spark.table(tableName)
+ val relations = readback.queryExecution.analyzed.collect { case rel: LogicalRelation =>
+ val columnStats = rel.catalogTable.get.stats.get.colStats
+ expectedColStatsSeq.foreach { case (field, expectedColStat) =>
+ assert(columnStats.contains(field.name))
+ val colStat = columnStats(field.name)
+ StatisticsTest.checkColStat(
+ dataType = field.dataType,
+ colStat = colStat,
+ expectedColStat = expectedColStat,
+ rsd = spark.sessionState.conf.ndvMaxError)
+ }
+ rel
+ }
+ assert(relations.size == 1)
+ }
+ }
+
test("estimates the size of a test MetastoreRelation") {
val df = sql("""SELECT * FROM src""")
val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index a215c70da0..f5c605fe5e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -123,6 +123,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assertNoSuchTable(s"SHOW CREATE TABLE $viewName")
assertNoSuchTable(s"SHOW PARTITIONS $viewName")
assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
+ assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id")
}
}