aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala95
1 files changed, 65 insertions, 30 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index f24b240956..3d4efef953 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -74,11 +75,10 @@ case class Statistics(
* Statistics collected for a column.
*
* 1. Supported data types are defined in `ColumnStat.supportsType`.
- * 2. The JVM data type stored in min/max is the external data type (used in Row) for the
- * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for
- * TimestampType we store java.sql.Timestamp.
- * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs.
- * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms
+ * 2. The JVM data type stored in min/max is the internal data type for the corresponding
+ * Catalyst data type. For example, the internal type of DateType is Int, and that the internal
+ * type of TimestampType is Long.
+ * 3. There is no guarantee that the statistics collected are accurate. Approximation algorithms
* (sketches) might have been used, and the data collected can also be stale.
*
* @param distinctCount number of distinct values
@@ -104,22 +104,43 @@ case class ColumnStat(
/**
* Returns a map from string to string that can be used to serialize the column stats.
* The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
- * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]].
+ * representation for the value. min/max values are converted to the external data type. For
+ * example, for DateType we store java.sql.Date, and for TimestampType we store
+ * java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]].
*
* As part of the protocol, the returned map always contains a key called "version".
* In the case min/max values are null (None), they won't appear in the map.
*/
- def toMap: Map[String, String] = {
+ def toMap(colName: String, dataType: DataType): Map[String, String] = {
val map = new scala.collection.mutable.HashMap[String, String]
map.put(ColumnStat.KEY_VERSION, "1")
map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
- min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, v.toString) }
- max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, v.toString) }
+ min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) }
+ max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) }
map.toMap
}
+
+ /**
+ * Converts the given value from Catalyst data type to string representation of external
+ * data type.
+ */
+ private def toExternalString(v: Any, colName: String, dataType: DataType): String = {
+ val externalValue = dataType match {
+ case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
+ case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
+ case BooleanType | _: IntegralType | FloatType | DoubleType => v
+ case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
+ // This version of Spark does not use min/max for binary/string types so we ignore it.
+ case _ =>
+ throw new AnalysisException("Column statistics deserialization is not supported for " +
+ s"column $colName of data type: $dataType.")
+ }
+ externalValue.toString
+ }
+
}
@@ -150,28 +171,15 @@ object ColumnStat extends Logging {
* Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
* from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
*/
- def fromMap(table: String, field: StructField, map: Map[String, String])
- : Option[ColumnStat] = {
- val str2val: (String => Any) = field.dataType match {
- case _: IntegralType => _.toLong
- case _: DecimalType => new java.math.BigDecimal(_)
- case DoubleType | FloatType => _.toDouble
- case BooleanType => _.toBoolean
- case DateType => java.sql.Date.valueOf
- case TimestampType => java.sql.Timestamp.valueOf
- // This version of Spark does not use min/max for binary/string types so we ignore it.
- case BinaryType | StringType => _ => null
- case _ =>
- throw new AnalysisException("Column statistics deserialization is not supported for " +
- s"column ${field.name} of data type: ${field.dataType}.")
- }
-
+ def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = {
try {
Some(ColumnStat(
distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
// Note that flatMap(Option.apply) turns Option(null) into None.
- min = map.get(KEY_MIN_VALUE).map(str2val).flatMap(Option.apply),
- max = map.get(KEY_MAX_VALUE).map(str2val).flatMap(Option.apply),
+ min = map.get(KEY_MIN_VALUE)
+ .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
+ max = map.get(KEY_MAX_VALUE)
+ .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong
@@ -184,6 +192,30 @@ object ColumnStat extends Logging {
}
/**
+ * Converts from string representation of external data type to the corresponding Catalyst data
+ * type.
+ */
+ private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
+ dataType match {
+ case BooleanType => s.toBoolean
+ case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
+ case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+ case ByteType => s.toByte
+ case ShortType => s.toShort
+ case IntegerType => s.toInt
+ case LongType => s.toLong
+ case FloatType => s.toFloat
+ case DoubleType => s.toDouble
+ case _: DecimalType => Decimal(s)
+ // This version of Spark does not use min/max for binary/string types so we ignore it.
+ case BinaryType | StringType => null
+ case _ =>
+ throw new AnalysisException("Column statistics deserialization is not supported for " +
+ s"column $name of data type: $dataType.")
+ }
+ }
+
+ /**
* Constructs an expression to compute column statistics for a given column.
*
* The expression should create a single struct column with the following schema:
@@ -232,11 +264,14 @@ object ColumnStat extends Logging {
}
/** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */
- def rowToColumnStat(row: Row): ColumnStat = {
+ def rowToColumnStat(row: Row, attr: Attribute): ColumnStat = {
ColumnStat(
distinctCount = BigInt(row.getLong(0)),
- min = Option(row.get(1)), // for string/binary min/max, get should return null
- max = Option(row.get(2)),
+ // for string/binary min/max, get should return null
+ min = Option(row.get(1))
+ .map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
+ max = Option(row.get(2))
+ .map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
nullCount = BigInt(row.getLong(3)),
avgLen = row.getLong(4),
maxLen = row.getLong(5)