aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-11-23 20:48:41 +0800
committerWenchen Fan <wenchen@databricks.com>2016-11-23 20:48:41 +0800
commit70ad07a9d20586ae182c4e60ed97bdddbcbceff3 (patch)
tree14666ca06583b5ee8fc6ee09b0434aa824c2efde /sql/catalyst
parent9785ed40d7fe4e1fcd440e55706519c6e5f8d6b1 (diff)
downloadspark-70ad07a9d20586ae182c4e60ed97bdddbcbceff3.tar.gz
spark-70ad07a9d20586ae182c4e60ed97bdddbcbceff3.tar.bz2
spark-70ad07a9d20586ae182c4e60ed97bdddbcbceff3.zip
[SPARK-18522][SQL] Explicit contract for column stats serialization
## What changes were proposed in this pull request? The current implementation of column stats uses the base64 encoding of the internal UnsafeRow format to persist statistics (in table properties in Hive metastore). This is an internal format that is not stable across different versions of Spark and should NOT be used for persistence. In addition, it would be better if statistics stored in the catalog is human readable. This pull request introduces the following changes: 1. Created a single ColumnStat class to for all data types. All data types track the same set of statistics. 2. Updated the implementation for stats collection to get rid of the dependency on internal data structures (e.g. InternalRow, or storing DateType as an int32). For example, previously dates were stored as a single integer, but are now stored as java.sql.Date. When we implement the next steps of CBO, we can add code to convert those back into internal types again. 3. Documented clearly what JVM data types are being used to store what data. 4. Defined a simple Map[String, String] interface for serializing and deserializing column stats into/from the catalog. 5. Rearranged the method/function structure so it is more clear what the supported data types are, and also moved how stats are generated into ColumnStat class so they are easy to find. ## How was this patch tested? Removed most of the original test cases created for column statistics, and added three very simple ones to cover all the cases. The three test cases validate: 1. Roundtrip serialization works. 2. Behavior when analyzing non-existent column or unsupported data type column. 3. Result for stats collection for all valid data types. Also moved parser related tests into a parser test suite and added an explicit serialization test for the Hive external catalog. Author: Reynold Xin <rxin@databricks.com> Closes #15959 from rxin/SPARK-18522.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala212
1 files changed, 165 insertions, 47 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index f3e2147b8f..79865609cb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -17,12 +17,15 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.commons.codec.binary.Base64
+import scala.util.control.NonFatal
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.types._
+
/**
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
* corresponding statistic produced by the children. To override this behavior, override
@@ -58,60 +61,175 @@ case class Statistics(
}
}
+
/**
- * Statistics for a column.
+ * Statistics collected for a column.
+ *
+ * 1. Supported data types are defined in `ColumnStat.supportsType`.
+ * 2. The JVM data type stored in min/max is the external data type (used in Row) for the
+ * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for
+ * TimestampType we store java.sql.Timestamp.
+ * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs.
+ * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms
+ * (sketches) might have been used, and the data collected can also be stale.
+ *
+ * @param distinctCount number of distinct values
+ * @param min minimum value
+ * @param max maximum value
+ * @param nullCount number of nulls
+ * @param avgLen average length of the values. For fixed-length types, this should be a constant.
+ * @param maxLen maximum length of the values. For fixed-length types, this should be a constant.
*/
-case class ColumnStat(statRow: InternalRow) {
+case class ColumnStat(
+ distinctCount: BigInt,
+ min: Option[Any],
+ max: Option[Any],
+ nullCount: BigInt,
+ avgLen: Long,
+ maxLen: Long) {
- def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = {
- NumericColumnStat(statRow, dataType)
- }
- def forString: StringColumnStat = StringColumnStat(statRow)
- def forBinary: BinaryColumnStat = BinaryColumnStat(statRow)
- def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow)
+ // We currently don't store min/max for binary/string type. This can change in the future and
+ // then we need to remove this require.
+ require(min.isEmpty || (!min.get.isInstanceOf[Array[Byte]] && !min.get.isInstanceOf[String]))
+ require(max.isEmpty || (!max.get.isInstanceOf[Array[Byte]] && !max.get.isInstanceOf[String]))
- override def toString: String = {
- // use Base64 for encoding
- Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes)
+ /**
+ * Returns a map from string to string that can be used to serialize the column stats.
+ * The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
+ * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]].
+ *
+ * As part of the protocol, the returned map always contains a key called "version".
+ * In the case min/max values are null (None), they won't appear in the map.
+ */
+ def toMap: Map[String, String] = {
+ val map = new scala.collection.mutable.HashMap[String, String]
+ map.put(ColumnStat.KEY_VERSION, "1")
+ map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
+ map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
+ map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
+ map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
+ min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, v.toString) }
+ max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, v.toString) }
+ map.toMap
}
}
-object ColumnStat {
- def apply(numFields: Int, str: String): ColumnStat = {
- // use Base64 for decoding
- val bytes = Base64.decodeBase64(str)
- val unsafeRow = new UnsafeRow(numFields)
- unsafeRow.pointTo(bytes, bytes.length)
- ColumnStat(unsafeRow)
+
+object ColumnStat extends Logging {
+
+ // List of string keys used to serialize ColumnStat
+ val KEY_VERSION = "version"
+ private val KEY_DISTINCT_COUNT = "distinctCount"
+ private val KEY_MIN_VALUE = "min"
+ private val KEY_MAX_VALUE = "max"
+ private val KEY_NULL_COUNT = "nullCount"
+ private val KEY_AVG_LEN = "avgLen"
+ private val KEY_MAX_LEN = "maxLen"
+
+ /** Returns true iff the we support gathering column statistics on column of the given type. */
+ def supportsType(dataType: DataType): Boolean = dataType match {
+ case _: IntegralType => true
+ case _: DecimalType => true
+ case DoubleType | FloatType => true
+ case BooleanType => true
+ case DateType => true
+ case TimestampType => true
+ case BinaryType | StringType => true
+ case _ => false
}
-}
-case class NumericColumnStat[T <: AtomicType](statRow: InternalRow, dataType: T) {
- // The indices here must be consistent with `ColumnStatStruct.numericColumnStat`.
- val numNulls: Long = statRow.getLong(0)
- val max: T#InternalType = statRow.get(1, dataType).asInstanceOf[T#InternalType]
- val min: T#InternalType = statRow.get(2, dataType).asInstanceOf[T#InternalType]
- val ndv: Long = statRow.getLong(3)
-}
+ /**
+ * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
+ * from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
+ */
+ def fromMap(table: String, field: StructField, map: Map[String, String])
+ : Option[ColumnStat] = {
+ val str2val: (String => Any) = field.dataType match {
+ case _: IntegralType => _.toLong
+ case _: DecimalType => new java.math.BigDecimal(_)
+ case DoubleType | FloatType => _.toDouble
+ case BooleanType => _.toBoolean
+ case DateType => java.sql.Date.valueOf
+ case TimestampType => java.sql.Timestamp.valueOf
+ // This version of Spark does not use min/max for binary/string types so we ignore it.
+ case BinaryType | StringType => _ => null
+ case _ =>
+ throw new AnalysisException("Column statistics deserialization is not supported for " +
+ s"column ${field.name} of data type: ${field.dataType}.")
+ }
-case class StringColumnStat(statRow: InternalRow) {
- // The indices here must be consistent with `ColumnStatStruct.stringColumnStat`.
- val numNulls: Long = statRow.getLong(0)
- val avgColLen: Double = statRow.getDouble(1)
- val maxColLen: Long = statRow.getInt(2)
- val ndv: Long = statRow.getLong(3)
-}
+ try {
+ Some(ColumnStat(
+ distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
+ // Note that flatMap(Option.apply) turns Option(null) into None.
+ min = map.get(KEY_MIN_VALUE).map(str2val).flatMap(Option.apply),
+ max = map.get(KEY_MAX_VALUE).map(str2val).flatMap(Option.apply),
+ nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
+ avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
+ maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong
+ ))
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to parse column statistics for column ${field.name} in table $table", e)
+ None
+ }
+ }
-case class BinaryColumnStat(statRow: InternalRow) {
- // The indices here must be consistent with `ColumnStatStruct.binaryColumnStat`.
- val numNulls: Long = statRow.getLong(0)
- val avgColLen: Double = statRow.getDouble(1)
- val maxColLen: Long = statRow.getInt(2)
-}
+ /**
+ * Constructs an expression to compute column statistics for a given column.
+ *
+ * The expression should create a single struct column with the following schema:
+ * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long
+ *
+ * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and
+ * as a result should stay in sync with it.
+ */
+ def statExprs(col: Attribute, relativeSD: Double): CreateNamedStruct = {
+ def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr =>
+ expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() }
+ })
+ val one = Literal(1, LongType)
+
+ // the approximate ndv (num distinct value) should never be larger than the number of rows
+ val numNonNulls = if (col.nullable) Count(col) else Count(one)
+ val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls))
+ val numNulls = Subtract(Count(one), numNonNulls)
+
+ def fixedLenTypeStruct(castType: DataType) = {
+ // For fixed width types, avg size should be the same as max size.
+ val avgSize = Literal(col.dataType.defaultSize, LongType)
+ struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, avgSize, avgSize)
+ }
+
+ col.dataType match {
+ case _: IntegralType => fixedLenTypeStruct(LongType)
+ case _: DecimalType => fixedLenTypeStruct(col.dataType)
+ case DoubleType | FloatType => fixedLenTypeStruct(DoubleType)
+ case BooleanType => fixedLenTypeStruct(col.dataType)
+ case DateType => fixedLenTypeStruct(col.dataType)
+ case TimestampType => fixedLenTypeStruct(col.dataType)
+ case BinaryType | StringType =>
+ // For string and binary type, we don't store min/max.
+ val nullLit = Literal(null, col.dataType)
+ struct(
+ ndv, nullLit, nullLit, numNulls,
+ Ceil(Average(Length(col))), Cast(Max(Length(col)), LongType))
+ case _ =>
+ throw new AnalysisException("Analyzing column statistics is not supported for column " +
+ s"${col.name} of data type: ${col.dataType}.")
+ }
+ }
+
+ /** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */
+ def rowToColumnStat(row: Row): ColumnStat = {
+ ColumnStat(
+ distinctCount = BigInt(row.getLong(0)),
+ min = Option(row.get(1)), // for string/binary min/max, get should return null
+ max = Option(row.get(2)),
+ nullCount = BigInt(row.getLong(3)),
+ avgLen = row.getLong(4),
+ maxLen = row.getLong(5)
+ )
+ }
-case class BooleanColumnStat(statRow: InternalRow) {
- // The indices here must be consistent with `ColumnStatStruct.booleanColumnStat`.
- val numNulls: Long = statRow.getLong(0)
- val numTrues: Long = statRow.getLong(1)
- val numFalses: Long = statRow.getLong(2)
}