aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-11-23 20:48:41 +0800
committerWenchen Fan <wenchen@databricks.com>2016-11-23 20:48:41 +0800
commit70ad07a9d20586ae182c4e60ed97bdddbcbceff3 (patch)
tree14666ca06583b5ee8fc6ee09b0434aa824c2efde /sql/hive/src/test
parent9785ed40d7fe4e1fcd440e55706519c6e5f8d6b1 (diff)
downloadspark-70ad07a9d20586ae182c4e60ed97bdddbcbceff3.tar.gz
spark-70ad07a9d20586ae182c4e60ed97bdddbcbceff3.tar.bz2
spark-70ad07a9d20586ae182c4e60ed97bdddbcbceff3.zip
[SPARK-18522][SQL] Explicit contract for column stats serialization
## What changes were proposed in this pull request? The current implementation of column stats uses the base64 encoding of the internal UnsafeRow format to persist statistics (in table properties in Hive metastore). This is an internal format that is not stable across different versions of Spark and should NOT be used for persistence. In addition, it would be better if statistics stored in the catalog is human readable. This pull request introduces the following changes: 1. Created a single ColumnStat class to for all data types. All data types track the same set of statistics. 2. Updated the implementation for stats collection to get rid of the dependency on internal data structures (e.g. InternalRow, or storing DateType as an int32). For example, previously dates were stored as a single integer, but are now stored as java.sql.Date. When we implement the next steps of CBO, we can add code to convert those back into internal types again. 3. Documented clearly what JVM data types are being used to store what data. 4. Defined a simple Map[String, String] interface for serializing and deserializing column stats into/from the catalog. 5. Rearranged the method/function structure so it is more clear what the supported data types are, and also moved how stats are generated into ColumnStat class so they are easy to find. ## How was this patch tested? Removed most of the original test cases created for column statistics, and added three very simple ones to cover all the cases. The three test cases validate: 1. Roundtrip serialization works. 2. Behavior when analyzing non-existent column or unsupported data type column. 3. Result for stats collection for all valid data types. Also moved parser related tests into a parser test suite and added an explicit serialization test for the Hive external catalog. Author: Reynold Xin <rxin@databricks.com> Closes #15959 from rxin/SPARK-18522.
Diffstat (limited to 'sql/hive/src/test')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala299
1 files changed, 113 insertions, 186 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 4f5ebc3d83..5ae202fdc9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -22,56 +22,16 @@ import java.io.{File, PrintWriter}
import scala.reflect.ClassTag
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
-import org.apache.spark.sql.execution.command.{AnalyzeTableCommand, DDLUtils}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
-class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
-
- test("parse analyze commands") {
- def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
- val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
- val operators = parsed.collect {
- case a: AnalyzeTableCommand => a
- case o => o
- }
-
- assert(operators.size === 1)
- if (operators(0).getClass() != c) {
- fail(
- s"""$analyzeCommand expected command: $c, but got ${operators(0)}
- |parsed command:
- |$parsed
- """.stripMargin)
- }
- }
-
- assertAnalyzeCommand(
- "ANALYZE TABLE Table1 COMPUTE STATISTICS",
- classOf[AnalyzeTableCommand])
- assertAnalyzeCommand(
- "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
- classOf[AnalyzeTableCommand])
- assertAnalyzeCommand(
- "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
- classOf[AnalyzeTableCommand])
- assertAnalyzeCommand(
- "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS",
- classOf[AnalyzeTableCommand])
- assertAnalyzeCommand(
- "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan",
- classOf[AnalyzeTableCommand])
-
- assertAnalyzeCommand(
- "ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn",
- classOf[AnalyzeTableCommand])
- }
+class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
test("MetastoreRelations fallback to HDFS for size estimation") {
val enableFallBackToHdfsForStats = spark.sessionState.conf.fallBackToHdfsForStatsEnabled
@@ -310,6 +270,110 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}
}
+ test("verify serialized column stats after analyzing columns") {
+ import testImplicits._
+
+ val tableName = "column_stats_test2"
+ // (data.head.productArity - 1) because the last column does not support stats collection.
+ assert(stats.size == data.head.productArity - 1)
+ val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+
+ withTable(tableName) {
+ df.write.saveAsTable(tableName)
+
+ // Collect statistics
+ sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
+
+ // Validate statistics
+ val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ val table = hiveClient.getTable("default", tableName)
+
+ val props = table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats"))
+ assert(props == Map(
+ "spark.sql.statistics.colStats.cbinary.avgLen" -> "3",
+ "spark.sql.statistics.colStats.cbinary.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cbinary.maxLen" -> "3",
+ "spark.sql.statistics.colStats.cbinary.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cbinary.version" -> "1",
+ "spark.sql.statistics.colStats.cbool.avgLen" -> "1",
+ "spark.sql.statistics.colStats.cbool.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cbool.max" -> "true",
+ "spark.sql.statistics.colStats.cbool.maxLen" -> "1",
+ "spark.sql.statistics.colStats.cbool.min" -> "false",
+ "spark.sql.statistics.colStats.cbool.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cbool.version" -> "1",
+ "spark.sql.statistics.colStats.cbyte.avgLen" -> "1",
+ "spark.sql.statistics.colStats.cbyte.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cbyte.max" -> "2",
+ "spark.sql.statistics.colStats.cbyte.maxLen" -> "1",
+ "spark.sql.statistics.colStats.cbyte.min" -> "1",
+ "spark.sql.statistics.colStats.cbyte.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cbyte.version" -> "1",
+ "spark.sql.statistics.colStats.cdate.avgLen" -> "4",
+ "spark.sql.statistics.colStats.cdate.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cdate.max" -> "2016-05-09",
+ "spark.sql.statistics.colStats.cdate.maxLen" -> "4",
+ "spark.sql.statistics.colStats.cdate.min" -> "2016-05-08",
+ "spark.sql.statistics.colStats.cdate.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cdate.version" -> "1",
+ "spark.sql.statistics.colStats.cdecimal.avgLen" -> "16",
+ "spark.sql.statistics.colStats.cdecimal.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cdecimal.max" -> "8.000000000000000000",
+ "spark.sql.statistics.colStats.cdecimal.maxLen" -> "16",
+ "spark.sql.statistics.colStats.cdecimal.min" -> "1.000000000000000000",
+ "spark.sql.statistics.colStats.cdecimal.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cdecimal.version" -> "1",
+ "spark.sql.statistics.colStats.cdouble.avgLen" -> "8",
+ "spark.sql.statistics.colStats.cdouble.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cdouble.max" -> "6.0",
+ "spark.sql.statistics.colStats.cdouble.maxLen" -> "8",
+ "spark.sql.statistics.colStats.cdouble.min" -> "1.0",
+ "spark.sql.statistics.colStats.cdouble.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cdouble.version" -> "1",
+ "spark.sql.statistics.colStats.cfloat.avgLen" -> "4",
+ "spark.sql.statistics.colStats.cfloat.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cfloat.max" -> "7.0",
+ "spark.sql.statistics.colStats.cfloat.maxLen" -> "4",
+ "spark.sql.statistics.colStats.cfloat.min" -> "1.0",
+ "spark.sql.statistics.colStats.cfloat.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cfloat.version" -> "1",
+ "spark.sql.statistics.colStats.cint.avgLen" -> "4",
+ "spark.sql.statistics.colStats.cint.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cint.max" -> "4",
+ "spark.sql.statistics.colStats.cint.maxLen" -> "4",
+ "spark.sql.statistics.colStats.cint.min" -> "1",
+ "spark.sql.statistics.colStats.cint.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cint.version" -> "1",
+ "spark.sql.statistics.colStats.clong.avgLen" -> "8",
+ "spark.sql.statistics.colStats.clong.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.clong.max" -> "5",
+ "spark.sql.statistics.colStats.clong.maxLen" -> "8",
+ "spark.sql.statistics.colStats.clong.min" -> "1",
+ "spark.sql.statistics.colStats.clong.nullCount" -> "1",
+ "spark.sql.statistics.colStats.clong.version" -> "1",
+ "spark.sql.statistics.colStats.cshort.avgLen" -> "2",
+ "spark.sql.statistics.colStats.cshort.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cshort.max" -> "3",
+ "spark.sql.statistics.colStats.cshort.maxLen" -> "2",
+ "spark.sql.statistics.colStats.cshort.min" -> "1",
+ "spark.sql.statistics.colStats.cshort.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cshort.version" -> "1",
+ "spark.sql.statistics.colStats.cstring.avgLen" -> "3",
+ "spark.sql.statistics.colStats.cstring.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cstring.maxLen" -> "3",
+ "spark.sql.statistics.colStats.cstring.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cstring.version" -> "1",
+ "spark.sql.statistics.colStats.ctimestamp.avgLen" -> "8",
+ "spark.sql.statistics.colStats.ctimestamp.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 00:00:02.0",
+ "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
+ "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.0",
+ "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
+ "spark.sql.statistics.colStats.ctimestamp.version" -> "1"
+ ))
+ }
+ }
+
private def testUpdatingTableStats(tableDescription: String, createTableCmd: String): Unit = {
test("test table-level statistics for " + tableDescription) {
val parquetTable = "parquetTable"
@@ -319,7 +383,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
TableIdentifier(parquetTable))
assert(DDLUtils.isDatasourceTable(catalogTable))
- sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ // Add a filter to avoid creating too many partitions
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10")
checkTableStats(
parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
@@ -328,7 +393,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
val fetchedStats1 = checkTableStats(
parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
- sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+ sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10")
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
val fetchedStats2 = checkTableStats(
parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
@@ -340,7 +405,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
parquetTable,
isDataSourceTable = true,
hasSizeInBytes = true,
- expectedRowCounts = Some(1000))
+ expectedRowCounts = Some(20))
assert(fetchedStats3.get.sizeInBytes == fetchedStats2.get.sizeInBytes)
}
}
@@ -369,6 +434,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}
}
+ /** Used to test refreshing cached metadata once table stats are updated. */
private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean): (Statistics, Statistics) = {
val tableName = "tbl"
var statsBeforeUpdate: Statistics = null
@@ -411,145 +477,6 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
assert(statsAfterUpdate.rowCount == Some(2))
}
- test("test refreshing column stats of cached data source table by `ANALYZE TABLE` statement") {
- val (statsBeforeUpdate, statsAfterUpdate) = getStatsBeforeAfterUpdate(isAnalyzeColumns = true)
-
- assert(statsBeforeUpdate.sizeInBytes > 0)
- assert(statsBeforeUpdate.rowCount == Some(1))
- StatisticsTest.checkColStat(
- dataType = IntegerType,
- colStat = statsBeforeUpdate.colStats("key"),
- expectedColStat = ColumnStat(InternalRow(0L, 1, 1, 1L)),
- rsd = spark.sessionState.conf.ndvMaxError)
-
- assert(statsAfterUpdate.sizeInBytes > statsBeforeUpdate.sizeInBytes)
- assert(statsAfterUpdate.rowCount == Some(2))
- StatisticsTest.checkColStat(
- dataType = IntegerType,
- colStat = statsAfterUpdate.colStats("key"),
- expectedColStat = ColumnStat(InternalRow(0L, 2, 1, 2L)),
- rsd = spark.sessionState.conf.ndvMaxError)
- }
-
- private lazy val (testDataFrame, expectedColStatsSeq) = {
- import testImplicits._
-
- val intSeq = Seq(1, 2)
- val stringSeq = Seq("a", "bb")
- val binarySeq = Seq("a", "bb").map(_.getBytes)
- val booleanSeq = Seq(true, false)
- val data = intSeq.indices.map { i =>
- (intSeq(i), stringSeq(i), binarySeq(i), booleanSeq(i))
- }
- val df: DataFrame = data.toDF("c1", "c2", "c3", "c4")
- val expectedColStatsSeq: Seq[(StructField, ColumnStat)] = df.schema.map { f =>
- val colStat = f.dataType match {
- case IntegerType =>
- ColumnStat(InternalRow(0L, intSeq.max, intSeq.min, intSeq.distinct.length.toLong))
- case StringType =>
- ColumnStat(InternalRow(0L, stringSeq.map(_.length).sum / stringSeq.length.toDouble,
- stringSeq.map(_.length).max.toInt, stringSeq.distinct.length.toLong))
- case BinaryType =>
- ColumnStat(InternalRow(0L, binarySeq.map(_.length).sum / binarySeq.length.toDouble,
- binarySeq.map(_.length).max.toInt))
- case BooleanType =>
- ColumnStat(InternalRow(0L, booleanSeq.count(_.equals(true)).toLong,
- booleanSeq.count(_.equals(false)).toLong))
- }
- (f, colStat)
- }
- (df, expectedColStatsSeq)
- }
-
- private def checkColStats(
- tableName: String,
- isDataSourceTable: Boolean,
- expectedColStatsSeq: Seq[(StructField, ColumnStat)]): Unit = {
- val readback = spark.table(tableName)
- val stats = readback.queryExecution.analyzed.collect {
- case rel: MetastoreRelation =>
- assert(!isDataSourceTable, "Expected a Hive serde table, but got a data source table")
- rel.catalogTable.stats.get
- case rel: LogicalRelation =>
- assert(isDataSourceTable, "Expected a data source table, but got a Hive serde table")
- rel.catalogTable.get.stats.get
- }
- assert(stats.length == 1)
- val columnStats = stats.head.colStats
- assert(columnStats.size == expectedColStatsSeq.length)
- expectedColStatsSeq.foreach { case (field, expectedColStat) =>
- StatisticsTest.checkColStat(
- dataType = field.dataType,
- colStat = columnStats(field.name),
- expectedColStat = expectedColStat,
- rsd = spark.sessionState.conf.ndvMaxError)
- }
- }
-
- test("generate and load column-level stats for data source table") {
- val dsTable = "dsTable"
- withTable(dsTable) {
- testDataFrame.write.format("parquet").saveAsTable(dsTable)
- sql(s"ANALYZE TABLE $dsTable COMPUTE STATISTICS FOR COLUMNS c1, c2, c3, c4")
- checkColStats(dsTable, isDataSourceTable = true, expectedColStatsSeq)
- }
- }
-
- test("generate and load column-level stats for hive serde table") {
- val hTable = "hTable"
- val tmp = "tmp"
- withTable(hTable, tmp) {
- testDataFrame.write.format("parquet").saveAsTable(tmp)
- sql(s"CREATE TABLE $hTable (c1 int, c2 string, c3 binary, c4 boolean) STORED AS TEXTFILE")
- sql(s"INSERT INTO $hTable SELECT * FROM $tmp")
- sql(s"ANALYZE TABLE $hTable COMPUTE STATISTICS FOR COLUMNS c1, c2, c3, c4")
- checkColStats(hTable, isDataSourceTable = false, expectedColStatsSeq)
- }
- }
-
- // When caseSensitive is on, for columns with only case difference, they are different columns
- // and we should generate column stats for all of them.
- private def checkCaseSensitiveColStats(columnName: String): Unit = {
- val tableName = "tbl"
- withTable(tableName) {
- val column1 = columnName.toLowerCase
- val column2 = columnName.toUpperCase
- withSQLConf("spark.sql.caseSensitive" -> "true") {
- sql(s"CREATE TABLE $tableName (`$column1` int, `$column2` double) USING PARQUET")
- sql(s"INSERT INTO $tableName SELECT 1, 3.0")
- sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS `$column1`, `$column2`")
- val readback = spark.table(tableName)
- val relations = readback.queryExecution.analyzed.collect { case rel: LogicalRelation =>
- val columnStats = rel.catalogTable.get.stats.get.colStats
- assert(columnStats.size == 2)
- StatisticsTest.checkColStat(
- dataType = IntegerType,
- colStat = columnStats(column1),
- expectedColStat = ColumnStat(InternalRow(0L, 1, 1, 1L)),
- rsd = spark.sessionState.conf.ndvMaxError)
- StatisticsTest.checkColStat(
- dataType = DoubleType,
- colStat = columnStats(column2),
- expectedColStat = ColumnStat(InternalRow(0L, 3.0d, 3.0d, 1L)),
- rsd = spark.sessionState.conf.ndvMaxError)
- rel
- }
- assert(relations.size == 1)
- }
- }
- }
-
- test("check column statistics for case sensitive column names") {
- checkCaseSensitiveColStats(columnName = "c1")
- }
-
- test("check column statistics for case sensitive non-ascii column names") {
- // scalastyle:off
- // non ascii characters are not allowed in the source code, so we disable the scalastyle.
- checkCaseSensitiveColStats(columnName = "列c")
- // scalastyle:on
- }
-
test("estimates the size of a test MetastoreRelation") {
val df = sql("""SELECT * FROM src""")
val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation =>