aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-11-23 20:48:41 +0800
committerWenchen Fan <wenchen@databricks.com>2016-11-23 20:48:41 +0800
commit70ad07a9d20586ae182c4e60ed97bdddbcbceff3 (patch)
tree14666ca06583b5ee8fc6ee09b0434aa824c2efde /sql/core
parent9785ed40d7fe4e1fcd440e55706519c6e5f8d6b1 (diff)
downloadspark-70ad07a9d20586ae182c4e60ed97bdddbcbceff3.tar.gz
spark-70ad07a9d20586ae182c4e60ed97bdddbcbceff3.tar.bz2
spark-70ad07a9d20586ae182c4e60ed97bdddbcbceff3.zip
[SPARK-18522][SQL] Explicit contract for column stats serialization
## What changes were proposed in this pull request? The current implementation of column stats uses the base64 encoding of the internal UnsafeRow format to persist statistics (in table properties in Hive metastore). This is an internal format that is not stable across different versions of Spark and should NOT be used for persistence. In addition, it would be better if statistics stored in the catalog is human readable. This pull request introduces the following changes: 1. Created a single ColumnStat class to for all data types. All data types track the same set of statistics. 2. Updated the implementation for stats collection to get rid of the dependency on internal data structures (e.g. InternalRow, or storing DateType as an int32). For example, previously dates were stored as a single integer, but are now stored as java.sql.Date. When we implement the next steps of CBO, we can add code to convert those back into internal types again. 3. Documented clearly what JVM data types are being used to store what data. 4. Defined a simple Map[String, String] interface for serializing and deserializing column stats into/from the catalog. 5. Rearranged the method/function structure so it is more clear what the supported data types are, and also moved how stats are generated into ColumnStat class so they are easy to find. ## How was this patch tested? Removed most of the original test cases created for column statistics, and added three very simple ones to cover all the cases. The three test cases validate: 1. Roundtrip serialization works. 2. Behavior when analyzing non-existent column or unsupported data type column. 3. Result for stats collection for all valid data types. Also moved parser related tests into a parser test suite and added an explicit serialization test for the Hive external catalog. Author: Reynold Xin <rxin@databricks.com> Closes #15959 from rxin/SPARK-18522.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala105
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala218
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala334
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala92
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala130
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala26
6 files changed, 257 insertions, 648 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 7fc57d09e9..9dffe3614a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -24,9 +24,8 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types._
/**
@@ -62,7 +61,7 @@ case class AnalyzeColumnCommand(
// Compute stats for each column
val (rowCount, newColStats) =
- AnalyzeColumnCommand.computeColStats(sparkSession, relation, columnNames)
+ AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)
// We also update table-level stats in order to keep them consistent with column-level stats.
val statistics = Statistics(
@@ -88,8 +87,9 @@ object AnalyzeColumnCommand extends Logging {
*
* This is visible for testing.
*/
- def computeColStats(
+ def computeColumnStats(
sparkSession: SparkSession,
+ tableName: String,
relation: LogicalPlan,
columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = {
@@ -97,102 +97,33 @@ object AnalyzeColumnCommand extends Logging {
val resolver = sparkSession.sessionState.conf.resolver
val attributesToAnalyze = AttributeSet(columnNames.map { col =>
val exprOption = relation.output.find(attr => resolver(attr.name, col))
- exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col."))
+ exprOption.getOrElse(throw new AnalysisException(s"Column $col does not exist."))
}).toSeq
+ // Make sure the column types are supported for stats gathering.
+ attributesToAnalyze.foreach { attr =>
+ if (!ColumnStat.supportsType(attr.dataType)) {
+ throw new AnalysisException(
+ s"Column ${attr.name} in table $tableName is of type ${attr.dataType}, " +
+ "and Spark does not support statistics collection on this column type.")
+ }
+ }
+
// Collect statistics per column.
// The first element in the result will be the overall row count, the following elements
// will be structs containing all column stats.
// The layout of each struct follows the layout of the ColumnStats.
val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
val expressions = Count(Literal(1)).toAggregateExpression() +:
- attributesToAnalyze.map(AnalyzeColumnCommand.createColumnStatStruct(_, ndvMaxErr))
+ attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
+
val namedExpressions = expressions.map(e => Alias(e, e.toString)())
- val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation))
- .queryExecution.toRdd.collect().head
+ val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()
- // unwrap the result
- // TODO: Get rid of numFields by using the public Dataset API.
val rowCount = statsRow.getLong(0)
val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, i) =>
- val numFields = AnalyzeColumnCommand.numStatFields(expr.dataType)
- (expr.name, ColumnStat(statsRow.getStruct(i + 1, numFields)))
+ (expr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1)))
}.toMap
(rowCount, columnStats)
}
-
- private val zero = Literal(0, LongType)
- private val one = Literal(1, LongType)
-
- private def numNulls(e: Expression): Expression = {
- if (e.nullable) Sum(If(IsNull(e), one, zero)) else zero
- }
- private def max(e: Expression): Expression = Max(e)
- private def min(e: Expression): Expression = Min(e)
- private def ndv(e: Expression, relativeSD: Double): Expression = {
- // the approximate ndv should never be larger than the number of rows
- Least(Seq(HyperLogLogPlusPlus(e, relativeSD), Count(one)))
- }
- private def avgLength(e: Expression): Expression = Average(Length(e))
- private def maxLength(e: Expression): Expression = Max(Length(e))
- private def numTrues(e: Expression): Expression = Sum(If(e, one, zero))
- private def numFalses(e: Expression): Expression = Sum(If(Not(e), one, zero))
-
- /**
- * Creates a struct that groups the sequence of expressions together. This is used to create
- * one top level struct per column.
- */
- private def createStruct(exprs: Seq[Expression]): CreateNamedStruct = {
- CreateStruct(exprs.map { expr: Expression =>
- expr.transformUp {
- case af: AggregateFunction => af.toAggregateExpression()
- }
- })
- }
-
- private def numericColumnStat(e: Expression, relativeSD: Double): Seq[Expression] = {
- Seq(numNulls(e), max(e), min(e), ndv(e, relativeSD))
- }
-
- private def stringColumnStat(e: Expression, relativeSD: Double): Seq[Expression] = {
- Seq(numNulls(e), avgLength(e), maxLength(e), ndv(e, relativeSD))
- }
-
- private def binaryColumnStat(e: Expression): Seq[Expression] = {
- Seq(numNulls(e), avgLength(e), maxLength(e))
- }
-
- private def booleanColumnStat(e: Expression): Seq[Expression] = {
- Seq(numNulls(e), numTrues(e), numFalses(e))
- }
-
- // TODO(rxin): Get rid of this function.
- def numStatFields(dataType: DataType): Int = {
- dataType match {
- case BinaryType | BooleanType => 3
- case _ => 4
- }
- }
-
- /**
- * Creates a struct expression that contains the statistics to collect for a column.
- *
- * @param attr column to collect statistics
- * @param relativeSD relative error for approximate number of distinct values.
- */
- def createColumnStatStruct(attr: Attribute, relativeSD: Double): CreateNamedStruct = {
- attr.dataType match {
- case _: NumericType | TimestampType | DateType =>
- createStruct(numericColumnStat(attr, relativeSD))
- case StringType =>
- createStruct(stringColumnStat(attr, relativeSD))
- case BinaryType =>
- createStruct(binaryColumnStat(attr))
- case BooleanType =>
- createStruct(booleanColumnStat(attr))
- case otherType =>
- throw new AnalysisException("Analyzing columns is not supported for column " +
- s"${attr.name} of data type: ${attr.dataType}.")
- }
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
new file mode 100644
index 0000000000..1fcccd0610
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.{lang => jl}
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
+import org.apache.spark.sql.test.SQLTestData.ArrayData
+import org.apache.spark.sql.types._
+
+
+/**
+ * End-to-end suite testing statistics collection and use on both entire table and columns.
+ */
+class StatisticsCollectionSuite extends StatisticsCollectionTestBase with SharedSQLContext {
+ import testImplicits._
+
+ private def checkTableStats(tableName: String, expectedRowCount: Option[Int])
+ : Option[Statistics] = {
+ val df = spark.table(tableName)
+ val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
+ assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
+ rel.catalogTable.get.stats
+ }
+ assert(stats.size == 1)
+ stats.head
+ }
+
+ test("estimates the size of a limit 0 on outer join") {
+ withTempView("test") {
+ Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
+ .createOrReplaceTempView("test")
+ val df1 = spark.table("test")
+ val df2 = spark.table("test").limit(0)
+ val df = df1.join(df2, Seq("k"), "left")
+
+ val sizes = df.queryExecution.analyzed.collect { case g: Join =>
+ g.statistics.sizeInBytes
+ }
+
+ assert(sizes.size === 1, s"number of Join nodes is wrong:\n ${df.queryExecution}")
+ assert(sizes.head === BigInt(96),
+ s"expected exact size 96 for table 'test', got: ${sizes.head}")
+ }
+ }
+
+ test("analyze column command - unsupported types and invalid columns") {
+ val tableName = "column_stats_test1"
+ withTable(tableName) {
+ Seq(ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 3)))).toDF().write.saveAsTable(tableName)
+
+ // Test unsupported data types
+ val err1 = intercept[AnalysisException] {
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS data")
+ }
+ assert(err1.message.contains("does not support statistics collection"))
+
+ // Test invalid columns
+ val err2 = intercept[AnalysisException] {
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS some_random_column")
+ }
+ assert(err2.message.contains("does not exist"))
+ }
+ }
+
+ test("test table-level statistics for data source table") {
+ val tableName = "tbl"
+ withTable(tableName) {
+ sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet")
+ Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto(tableName)
+
+ // noscan won't count the number of rows
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
+ checkTableStats(tableName, expectedRowCount = None)
+
+ // without noscan, we count the number of rows
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
+ checkTableStats(tableName, expectedRowCount = Some(2))
+ }
+ }
+
+ test("SPARK-15392: DataFrame created from RDD should not be broadcasted") {
+ val rdd = sparkContext.range(1, 100).map(i => Row(i, i))
+ val df = spark.createDataFrame(rdd, new StructType().add("a", LongType).add("b", LongType))
+ assert(df.queryExecution.analyzed.statistics.sizeInBytes >
+ spark.sessionState.conf.autoBroadcastJoinThreshold)
+ assert(df.selectExpr("a").queryExecution.analyzed.statistics.sizeInBytes >
+ spark.sessionState.conf.autoBroadcastJoinThreshold)
+ }
+
+ test("estimates the size of limit") {
+ withTempView("test") {
+ Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
+ .createOrReplaceTempView("test")
+ Seq((0, 1), (1, 24), (2, 48)).foreach { case (limit, expected) =>
+ val df = sql(s"""SELECT * FROM test limit $limit""")
+
+ val sizesGlobalLimit = df.queryExecution.analyzed.collect { case g: GlobalLimit =>
+ g.statistics.sizeInBytes
+ }
+ assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
+ assert(sizesGlobalLimit.head === BigInt(expected),
+ s"expected exact size $expected for table 'test', got: ${sizesGlobalLimit.head}")
+
+ val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit =>
+ l.statistics.sizeInBytes
+ }
+ assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
+ assert(sizesLocalLimit.head === BigInt(expected),
+ s"expected exact size $expected for table 'test', got: ${sizesLocalLimit.head}")
+ }
+ }
+ }
+
+}
+
+
+/**
+ * The base for test cases that we want to include in both the hive module (for verifying behavior
+ * when using the Hive external catalog) as well as in the sql/core module.
+ */
+abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils {
+ import testImplicits._
+
+ private val dec1 = new java.math.BigDecimal("1.000000000000000000")
+ private val dec2 = new java.math.BigDecimal("8.000000000000000000")
+ private val d1 = Date.valueOf("2016-05-08")
+ private val d2 = Date.valueOf("2016-05-09")
+ private val t1 = Timestamp.valueOf("2016-05-08 00:00:01")
+ private val t2 = Timestamp.valueOf("2016-05-09 00:00:02")
+
+ /**
+ * Define a very simple 3 row table used for testing column serialization.
+ * Note: last column is seq[int] which doesn't support stats collection.
+ */
+ protected val data = Seq[
+ (jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long,
+ jl.Double, jl.Float, java.math.BigDecimal,
+ String, Array[Byte], Date, Timestamp,
+ Seq[Int])](
+ (false, 1.toByte, 1.toShort, 1, 1L, 1.0, 1.0f, dec1, "s1", "b1".getBytes, d1, t1, null),
+ (true, 2.toByte, 3.toShort, 4, 5L, 6.0, 7.0f, dec2, "ss9", "bb0".getBytes, d2, t2, null),
+ (null, null, null, null, null, null, null, null, null, null, null, null, null)
+ )
+
+ /** A mapping from column to the stats collected. */
+ protected val stats = mutable.LinkedHashMap(
+ "cbool" -> ColumnStat(2, Some(false), Some(true), 1, 1, 1),
+ "cbyte" -> ColumnStat(2, Some(1L), Some(2L), 1, 1, 1),
+ "cshort" -> ColumnStat(2, Some(1L), Some(3L), 1, 2, 2),
+ "cint" -> ColumnStat(2, Some(1L), Some(4L), 1, 4, 4),
+ "clong" -> ColumnStat(2, Some(1L), Some(5L), 1, 8, 8),
+ "cdouble" -> ColumnStat(2, Some(1.0), Some(6.0), 1, 8, 8),
+ "cfloat" -> ColumnStat(2, Some(1.0), Some(7.0), 1, 4, 4),
+ "cdecimal" -> ColumnStat(2, Some(dec1), Some(dec2), 1, 16, 16),
+ "cstring" -> ColumnStat(2, None, None, 1, 3, 3),
+ "cbinary" -> ColumnStat(2, None, None, 1, 3, 3),
+ "cdate" -> ColumnStat(2, Some(d1), Some(d2), 1, 4, 4),
+ "ctimestamp" -> ColumnStat(2, Some(t1), Some(t2), 1, 8, 8)
+ )
+
+ test("column stats round trip serialization") {
+ // Make sure we serialize and then deserialize and we will get the result data
+ val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+ stats.zip(df.schema).foreach { case ((k, v), field) =>
+ withClue(s"column $k with type ${field.dataType}") {
+ val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap)
+ assert(roundtrip == Some(v))
+ }
+ }
+ }
+
+ test("analyze column command - result verification") {
+ val tableName = "column_stats_test2"
+ // (data.head.productArity - 1) because the last column does not support stats collection.
+ assert(stats.size == data.head.productArity - 1)
+ val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+
+ withTable(tableName) {
+ df.write.saveAsTable(tableName)
+
+ // Collect statistics
+ sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
+
+ // Validate statistics
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ assert(table.stats.isDefined)
+ assert(table.stats.get.colStats.size == stats.size)
+
+ stats.foreach { case (k, v) =>
+ withClue(s"column $k") {
+ assert(table.stats.get.colStats(k) == v)
+ }
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala
deleted file mode 100644
index e866ac2cb3..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.sql.{Date, Timestamp}
-
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
-import org.apache.spark.sql.test.SQLTestData.ArrayData
-import org.apache.spark.sql.types._
-
-class StatisticsColumnSuite extends StatisticsTest {
- import testImplicits._
-
- test("parse analyze column commands") {
- val tableName = "tbl"
-
- // we need to specify column names
- intercept[ParseException] {
- sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS")
- }
-
- val analyzeSql = s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS key, value"
- val parsed = spark.sessionState.sqlParser.parsePlan(analyzeSql)
- val expected = AnalyzeColumnCommand(TableIdentifier(tableName), Seq("key", "value"))
- comparePlans(parsed, expected)
- }
-
- test("analyzing columns of non-atomic types is not supported") {
- val tableName = "tbl"
- withTable(tableName) {
- Seq(ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 3)))).toDF().write.saveAsTable(tableName)
- val err = intercept[AnalysisException] {
- sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS data")
- }
- assert(err.message.contains("Analyzing columns is not supported"))
- }
- }
-
- test("check correctness of columns") {
- val table = "tbl"
- val colName1 = "abc"
- val colName2 = "x.yz"
- withTable(table) {
- sql(s"CREATE TABLE $table ($colName1 int, `$colName2` string) USING PARQUET")
-
- val invalidColError = intercept[AnalysisException] {
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key")
- }
- assert(invalidColError.message == "Invalid column name: key.")
-
- withSQLConf("spark.sql.caseSensitive" -> "true") {
- val invalidErr = intercept[AnalysisException] {
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ${colName1.toUpperCase}")
- }
- assert(invalidErr.message == s"Invalid column name: ${colName1.toUpperCase}.")
- }
-
- withSQLConf("spark.sql.caseSensitive" -> "false") {
- val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, colName2)
- val tableIdent = TableIdentifier(table, Some("default"))
- val relation = spark.sessionState.catalog.lookupRelation(tableIdent)
- val (_, columnStats) =
- AnalyzeColumnCommand.computeColStats(spark, relation, columnsToAnalyze)
- assert(columnStats.contains(colName1))
- assert(columnStats.contains(colName2))
- // check deduplication
- assert(columnStats.size == 2)
- assert(!columnStats.contains(colName2.toUpperCase))
- }
- }
- }
-
- private def getNonNullValues[T](values: Seq[Option[T]]): Seq[T] = {
- values.filter(_.isDefined).map(_.get)
- }
-
- test("column-level statistics for integral type columns") {
- val values = (0 to 5).map { i =>
- if (i % 2 == 0) None else Some(i)
- }
- val data = values.map { i =>
- (i.map(_.toByte), i.map(_.toShort), i.map(_.toInt), i.map(_.toLong))
- }
-
- val df = data.toDF("c1", "c2", "c3", "c4")
- val nonNullValues = getNonNullValues[Int](values)
- val expectedColStatsSeq = df.schema.map { f =>
- val colStat = ColumnStat(InternalRow(
- values.count(_.isEmpty).toLong,
- nonNullValues.max,
- nonNullValues.min,
- nonNullValues.distinct.length.toLong))
- (f, colStat)
- }
- checkColStats(df, expectedColStatsSeq)
- }
-
- test("column-level statistics for fractional type columns") {
- val values: Seq[Option[Decimal]] = (0 to 5).map { i =>
- if (i == 0) None else Some(Decimal(i + i * 0.01))
- }
- val data = values.map { i =>
- (i.map(_.toFloat), i.map(_.toDouble), i)
- }
-
- val df = data.toDF("c1", "c2", "c3")
- val nonNullValues = getNonNullValues[Decimal](values)
- val numNulls = values.count(_.isEmpty).toLong
- val ndv = nonNullValues.distinct.length.toLong
- val expectedColStatsSeq = df.schema.map { f =>
- val colStat = f.dataType match {
- case floatType: FloatType =>
- ColumnStat(InternalRow(numNulls, nonNullValues.max.toFloat, nonNullValues.min.toFloat,
- ndv))
- case doubleType: DoubleType =>
- ColumnStat(InternalRow(numNulls, nonNullValues.max.toDouble, nonNullValues.min.toDouble,
- ndv))
- case decimalType: DecimalType =>
- ColumnStat(InternalRow(numNulls, nonNullValues.max, nonNullValues.min, ndv))
- }
- (f, colStat)
- }
- checkColStats(df, expectedColStatsSeq)
- }
-
- test("column-level statistics for string column") {
- val values = Seq(None, Some("a"), Some("bbbb"), Some("cccc"), Some(""))
- val df = values.toDF("c1")
- val nonNullValues = getNonNullValues[String](values)
- val expectedColStatsSeq = df.schema.map { f =>
- val colStat = ColumnStat(InternalRow(
- values.count(_.isEmpty).toLong,
- nonNullValues.map(_.length).sum / nonNullValues.length.toDouble,
- nonNullValues.map(_.length).max.toInt,
- nonNullValues.distinct.length.toLong))
- (f, colStat)
- }
- checkColStats(df, expectedColStatsSeq)
- }
-
- test("column-level statistics for binary column") {
- val values = Seq(None, Some("a"), Some("bbbb"), Some("cccc"), Some("")).map(_.map(_.getBytes))
- val df = values.toDF("c1")
- val nonNullValues = getNonNullValues[Array[Byte]](values)
- val expectedColStatsSeq = df.schema.map { f =>
- val colStat = ColumnStat(InternalRow(
- values.count(_.isEmpty).toLong,
- nonNullValues.map(_.length).sum / nonNullValues.length.toDouble,
- nonNullValues.map(_.length).max.toInt))
- (f, colStat)
- }
- checkColStats(df, expectedColStatsSeq)
- }
-
- test("column-level statistics for boolean column") {
- val values = Seq(None, Some(true), Some(false), Some(true))
- val df = values.toDF("c1")
- val nonNullValues = getNonNullValues[Boolean](values)
- val expectedColStatsSeq = df.schema.map { f =>
- val colStat = ColumnStat(InternalRow(
- values.count(_.isEmpty).toLong,
- nonNullValues.count(_.equals(true)).toLong,
- nonNullValues.count(_.equals(false)).toLong))
- (f, colStat)
- }
- checkColStats(df, expectedColStatsSeq)
- }
-
- test("column-level statistics for date column") {
- val values = Seq(None, Some("1970-01-01"), Some("1970-02-02")).map(_.map(Date.valueOf))
- val df = values.toDF("c1")
- val nonNullValues = getNonNullValues[Date](values)
- val expectedColStatsSeq = df.schema.map { f =>
- val colStat = ColumnStat(InternalRow(
- values.count(_.isEmpty).toLong,
- // Internally, DateType is represented as the number of days from 1970-01-01.
- nonNullValues.map(DateTimeUtils.fromJavaDate).max,
- nonNullValues.map(DateTimeUtils.fromJavaDate).min,
- nonNullValues.distinct.length.toLong))
- (f, colStat)
- }
- checkColStats(df, expectedColStatsSeq)
- }
-
- test("column-level statistics for timestamp column") {
- val values = Seq(None, Some("1970-01-01 00:00:00"), Some("1970-01-01 00:00:05")).map { i =>
- i.map(Timestamp.valueOf)
- }
- val df = values.toDF("c1")
- val nonNullValues = getNonNullValues[Timestamp](values)
- val expectedColStatsSeq = df.schema.map { f =>
- val colStat = ColumnStat(InternalRow(
- values.count(_.isEmpty).toLong,
- // Internally, TimestampType is represented as the number of days from 1970-01-01
- nonNullValues.map(DateTimeUtils.fromJavaTimestamp).max,
- nonNullValues.map(DateTimeUtils.fromJavaTimestamp).min,
- nonNullValues.distinct.length.toLong))
- (f, colStat)
- }
- checkColStats(df, expectedColStatsSeq)
- }
-
- test("column-level statistics for null columns") {
- val values = Seq(None, None)
- val data = values.map { i =>
- (i.map(_.toString), i.map(_.toString.toInt))
- }
- val df = data.toDF("c1", "c2")
- val expectedColStatsSeq = df.schema.map { f =>
- (f, ColumnStat(InternalRow(values.count(_.isEmpty).toLong, null, null, 0L)))
- }
- checkColStats(df, expectedColStatsSeq)
- }
-
- test("column-level statistics for columns with different types") {
- val intSeq = Seq(1, 2)
- val doubleSeq = Seq(1.01d, 2.02d)
- val stringSeq = Seq("a", "bb")
- val binarySeq = Seq("a", "bb").map(_.getBytes)
- val booleanSeq = Seq(true, false)
- val dateSeq = Seq("1970-01-01", "1970-02-02").map(Date.valueOf)
- val timestampSeq = Seq("1970-01-01 00:00:00", "1970-01-01 00:00:05").map(Timestamp.valueOf)
- val longSeq = Seq(5L, 4L)
-
- val data = intSeq.indices.map { i =>
- (intSeq(i), doubleSeq(i), stringSeq(i), binarySeq(i), booleanSeq(i), dateSeq(i),
- timestampSeq(i), longSeq(i))
- }
- val df = data.toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8")
- val expectedColStatsSeq = df.schema.map { f =>
- val colStat = f.dataType match {
- case IntegerType =>
- ColumnStat(InternalRow(0L, intSeq.max, intSeq.min, intSeq.distinct.length.toLong))
- case DoubleType =>
- ColumnStat(InternalRow(0L, doubleSeq.max, doubleSeq.min,
- doubleSeq.distinct.length.toLong))
- case StringType =>
- ColumnStat(InternalRow(0L, stringSeq.map(_.length).sum / stringSeq.length.toDouble,
- stringSeq.map(_.length).max.toInt, stringSeq.distinct.length.toLong))
- case BinaryType =>
- ColumnStat(InternalRow(0L, binarySeq.map(_.length).sum / binarySeq.length.toDouble,
- binarySeq.map(_.length).max.toInt))
- case BooleanType =>
- ColumnStat(InternalRow(0L, booleanSeq.count(_.equals(true)).toLong,
- booleanSeq.count(_.equals(false)).toLong))
- case DateType =>
- ColumnStat(InternalRow(0L, dateSeq.map(DateTimeUtils.fromJavaDate).max,
- dateSeq.map(DateTimeUtils.fromJavaDate).min, dateSeq.distinct.length.toLong))
- case TimestampType =>
- ColumnStat(InternalRow(0L, timestampSeq.map(DateTimeUtils.fromJavaTimestamp).max,
- timestampSeq.map(DateTimeUtils.fromJavaTimestamp).min,
- timestampSeq.distinct.length.toLong))
- case LongType =>
- ColumnStat(InternalRow(0L, longSeq.max, longSeq.min, longSeq.distinct.length.toLong))
- }
- (f, colStat)
- }
- checkColStats(df, expectedColStatsSeq)
- }
-
- test("update table-level stats while collecting column-level stats") {
- val table = "tbl"
- withTable(table) {
- sql(s"CREATE TABLE $table (c1 int) USING PARQUET")
- sql(s"INSERT INTO $table SELECT 1")
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
- checkTableStats(tableName = table, expectedRowCount = Some(1))
-
- // update table-level stats between analyze table and analyze column commands
- sql(s"INSERT INTO $table SELECT 1")
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
- val fetchedStats = checkTableStats(tableName = table, expectedRowCount = Some(2))
-
- val colStat = fetchedStats.get.colStats("c1")
- StatisticsTest.checkColStat(
- dataType = IntegerType,
- colStat = colStat,
- expectedColStat = ColumnStat(InternalRow(0L, 1, 1, 1L)),
- rsd = spark.sessionState.conf.ndvMaxError)
- }
- }
-
- test("analyze column stats independently") {
- val table = "tbl"
- withTable(table) {
- sql(s"CREATE TABLE $table (c1 int, c2 long) USING PARQUET")
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
- val fetchedStats1 = checkTableStats(tableName = table, expectedRowCount = Some(0))
- assert(fetchedStats1.get.colStats.size == 1)
- val expected1 = ColumnStat(InternalRow(0L, null, null, 0L))
- val rsd = spark.sessionState.conf.ndvMaxError
- StatisticsTest.checkColStat(
- dataType = IntegerType,
- colStat = fetchedStats1.get.colStats("c1"),
- expectedColStat = expected1,
- rsd = rsd)
-
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c2")
- val fetchedStats2 = checkTableStats(tableName = table, expectedRowCount = Some(0))
- // column c1 is kept in the stats
- assert(fetchedStats2.get.colStats.size == 2)
- StatisticsTest.checkColStat(
- dataType = IntegerType,
- colStat = fetchedStats2.get.colStats("c1"),
- expectedColStat = expected1,
- rsd = rsd)
- val expected2 = ColumnStat(InternalRow(0L, null, null, 0L))
- StatisticsTest.checkColStat(
- dataType = LongType,
- colStat = fetchedStats2.get.colStats("c2"),
- expectedColStat = expected2,
- rsd = rsd)
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
deleted file mode 100644
index 8cf42e9248..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit}
-import org.apache.spark.sql.types._
-
-class StatisticsSuite extends StatisticsTest {
- import testImplicits._
-
- test("SPARK-15392: DataFrame created from RDD should not be broadcasted") {
- val rdd = sparkContext.range(1, 100).map(i => Row(i, i))
- val df = spark.createDataFrame(rdd, new StructType().add("a", LongType).add("b", LongType))
- assert(df.queryExecution.analyzed.statistics.sizeInBytes >
- spark.sessionState.conf.autoBroadcastJoinThreshold)
- assert(df.selectExpr("a").queryExecution.analyzed.statistics.sizeInBytes >
- spark.sessionState.conf.autoBroadcastJoinThreshold)
- }
-
- test("estimates the size of limit") {
- withTempView("test") {
- Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
- .createOrReplaceTempView("test")
- Seq((0, 1), (1, 24), (2, 48)).foreach { case (limit, expected) =>
- val df = sql(s"""SELECT * FROM test limit $limit""")
-
- val sizesGlobalLimit = df.queryExecution.analyzed.collect { case g: GlobalLimit =>
- g.statistics.sizeInBytes
- }
- assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
- assert(sizesGlobalLimit.head === BigInt(expected),
- s"expected exact size $expected for table 'test', got: ${sizesGlobalLimit.head}")
-
- val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit =>
- l.statistics.sizeInBytes
- }
- assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
- assert(sizesLocalLimit.head === BigInt(expected),
- s"expected exact size $expected for table 'test', got: ${sizesLocalLimit.head}")
- }
- }
- }
-
- test("estimates the size of a limit 0 on outer join") {
- withTempView("test") {
- Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
- .createOrReplaceTempView("test")
- val df1 = spark.table("test")
- val df2 = spark.table("test").limit(0)
- val df = df1.join(df2, Seq("k"), "left")
-
- val sizes = df.queryExecution.analyzed.collect { case g: Join =>
- g.statistics.sizeInBytes
- }
-
- assert(sizes.size === 1, s"number of Join nodes is wrong:\n ${df.queryExecution}")
- assert(sizes.head === BigInt(96),
- s"expected exact size 96 for table 'test', got: ${sizes.head}")
- }
- }
-
- test("test table-level statistics for data source table created in InMemoryCatalog") {
- val tableName = "tbl"
- withTable(tableName) {
- sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet")
- Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto(tableName)
-
- // noscan won't count the number of rows
- sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
- checkTableStats(tableName, expectedRowCount = None)
-
- // without noscan, we count the number of rows
- sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
- checkTableStats(tableName, expectedRowCount = Some(2))
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala
deleted file mode 100644
index 915ee0d31b..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
-import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types._
-
-
-trait StatisticsTest extends QueryTest with SharedSQLContext {
-
- def checkColStats(
- df: DataFrame,
- expectedColStatsSeq: Seq[(StructField, ColumnStat)]): Unit = {
- val table = "tbl"
- withTable(table) {
- df.write.format("json").saveAsTable(table)
- val columns = expectedColStatsSeq.map(_._1)
- val tableIdent = TableIdentifier(table, Some("default"))
- val relation = spark.sessionState.catalog.lookupRelation(tableIdent)
- val (_, columnStats) =
- AnalyzeColumnCommand.computeColStats(spark, relation, columns.map(_.name))
- expectedColStatsSeq.foreach { case (field, expectedColStat) =>
- assert(columnStats.contains(field.name))
- val colStat = columnStats(field.name)
- StatisticsTest.checkColStat(
- dataType = field.dataType,
- colStat = colStat,
- expectedColStat = expectedColStat,
- rsd = spark.sessionState.conf.ndvMaxError)
-
- // check if we get the same colStat after encoding and decoding
- val encodedCS = colStat.toString
- val numFields = AnalyzeColumnCommand.numStatFields(field.dataType)
- val decodedCS = ColumnStat(numFields, encodedCS)
- StatisticsTest.checkColStat(
- dataType = field.dataType,
- colStat = decodedCS,
- expectedColStat = expectedColStat,
- rsd = spark.sessionState.conf.ndvMaxError)
- }
- }
- }
-
- def checkTableStats(tableName: String, expectedRowCount: Option[Int]): Option[Statistics] = {
- val df = spark.table(tableName)
- val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
- assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
- rel.catalogTable.get.stats
- }
- assert(stats.size == 1)
- stats.head
- }
-}
-
-object StatisticsTest {
- def checkColStat(
- dataType: DataType,
- colStat: ColumnStat,
- expectedColStat: ColumnStat,
- rsd: Double): Unit = {
- dataType match {
- case StringType =>
- val cs = colStat.forString
- val expectedCS = expectedColStat.forString
- assert(cs.numNulls == expectedCS.numNulls)
- assert(cs.avgColLen == expectedCS.avgColLen)
- assert(cs.maxColLen == expectedCS.maxColLen)
- checkNdv(ndv = cs.ndv, expectedNdv = expectedCS.ndv, rsd = rsd)
- case BinaryType =>
- val cs = colStat.forBinary
- val expectedCS = expectedColStat.forBinary
- assert(cs.numNulls == expectedCS.numNulls)
- assert(cs.avgColLen == expectedCS.avgColLen)
- assert(cs.maxColLen == expectedCS.maxColLen)
- case BooleanType =>
- val cs = colStat.forBoolean
- val expectedCS = expectedColStat.forBoolean
- assert(cs.numNulls == expectedCS.numNulls)
- assert(cs.numTrues == expectedCS.numTrues)
- assert(cs.numFalses == expectedCS.numFalses)
- case atomicType: AtomicType =>
- checkNumericColStats(
- dataType = atomicType, colStat = colStat, expectedColStat = expectedColStat, rsd = rsd)
- }
- }
-
- private def checkNumericColStats(
- dataType: AtomicType,
- colStat: ColumnStat,
- expectedColStat: ColumnStat,
- rsd: Double): Unit = {
- val cs = colStat.forNumeric(dataType)
- val expectedCS = expectedColStat.forNumeric(dataType)
- assert(cs.numNulls == expectedCS.numNulls)
- assert(cs.max == expectedCS.max)
- assert(cs.min == expectedCS.min)
- checkNdv(ndv = cs.ndv, expectedNdv = expectedCS.ndv, rsd = rsd)
- }
-
- private def checkNdv(ndv: Long, expectedNdv: Long, rsd: Double): Unit = {
- // ndv is an approximate value, so we make sure we have the value, and it should be
- // within 3*SD's of the given rsd.
- if (expectedNdv == 0) {
- assert(ndv == 0)
- } else if (expectedNdv > 0) {
- assert(ndv > 0)
- val error = math.abs((ndv / expectedNdv.toDouble) - 1.0d)
- assert(error <= rsd * 3.0d, "Error should be within 3 std. errors.")
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 797fe9ffa8..b070138be0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -23,9 +23,8 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat,
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AnalyzeTableCommand, DescribeFunctionCommand,
- DescribeTableCommand, ShowFunctionsCommand}
-import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
@@ -221,12 +220,22 @@ class SparkSqlParserSuite extends PlanTest {
intercept("explain describe tables x", "Unsupported SQL statement")
}
- test("SPARK-18106 analyze table") {
+ test("analyze table statistics") {
assertEqual("analyze table t compute statistics",
AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
assertEqual("analyze table t compute statistics noscan",
AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
- assertEqual("analyze table t partition (a) compute statistics noscan",
+ assertEqual("analyze table t partition (a) compute statistics nOscAn",
+ AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+
+ // Partitions specified - we currently parse them but don't do anything with it
+ assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
+ AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+ assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
+ AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+ assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
+ AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+ assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan",
AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
intercept("analyze table t compute statistics xxxx",
@@ -234,4 +243,11 @@ class SparkSqlParserSuite extends PlanTest {
intercept("analyze table t partition (a) compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
}
+
+ test("analyze table column statistics") {
+ intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS", "")
+
+ assertEqual("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS key, value",
+ AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value")))
+ }
}