aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala95
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala30
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala68
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala70
4 files changed, 128 insertions, 135 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index f24b240956..3d4efef953 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -74,11 +75,10 @@ case class Statistics(
* Statistics collected for a column.
*
* 1. Supported data types are defined in `ColumnStat.supportsType`.
- * 2. The JVM data type stored in min/max is the external data type (used in Row) for the
- * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for
- * TimestampType we store java.sql.Timestamp.
- * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs.
- * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms
+ * 2. The JVM data type stored in min/max is the internal data type for the corresponding
+ * Catalyst data type. For example, the internal type of DateType is Int, and that the internal
+ * type of TimestampType is Long.
+ * 3. There is no guarantee that the statistics collected are accurate. Approximation algorithms
* (sketches) might have been used, and the data collected can also be stale.
*
* @param distinctCount number of distinct values
@@ -104,22 +104,43 @@ case class ColumnStat(
/**
* Returns a map from string to string that can be used to serialize the column stats.
* The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
- * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]].
+ * representation for the value. min/max values are converted to the external data type. For
+ * example, for DateType we store java.sql.Date, and for TimestampType we store
+ * java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]].
*
* As part of the protocol, the returned map always contains a key called "version".
* In the case min/max values are null (None), they won't appear in the map.
*/
- def toMap: Map[String, String] = {
+ def toMap(colName: String, dataType: DataType): Map[String, String] = {
val map = new scala.collection.mutable.HashMap[String, String]
map.put(ColumnStat.KEY_VERSION, "1")
map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
- min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, v.toString) }
- max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, v.toString) }
+ min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) }
+ max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) }
map.toMap
}
+
+ /**
+ * Converts the given value from Catalyst data type to string representation of external
+ * data type.
+ */
+ private def toExternalString(v: Any, colName: String, dataType: DataType): String = {
+ val externalValue = dataType match {
+ case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
+ case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
+ case BooleanType | _: IntegralType | FloatType | DoubleType => v
+ case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
+ // This version of Spark does not use min/max for binary/string types so we ignore it.
+ case _ =>
+ throw new AnalysisException("Column statistics deserialization is not supported for " +
+ s"column $colName of data type: $dataType.")
+ }
+ externalValue.toString
+ }
+
}
@@ -150,28 +171,15 @@ object ColumnStat extends Logging {
* Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
* from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
*/
- def fromMap(table: String, field: StructField, map: Map[String, String])
- : Option[ColumnStat] = {
- val str2val: (String => Any) = field.dataType match {
- case _: IntegralType => _.toLong
- case _: DecimalType => new java.math.BigDecimal(_)
- case DoubleType | FloatType => _.toDouble
- case BooleanType => _.toBoolean
- case DateType => java.sql.Date.valueOf
- case TimestampType => java.sql.Timestamp.valueOf
- // This version of Spark does not use min/max for binary/string types so we ignore it.
- case BinaryType | StringType => _ => null
- case _ =>
- throw new AnalysisException("Column statistics deserialization is not supported for " +
- s"column ${field.name} of data type: ${field.dataType}.")
- }
-
+ def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = {
try {
Some(ColumnStat(
distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
// Note that flatMap(Option.apply) turns Option(null) into None.
- min = map.get(KEY_MIN_VALUE).map(str2val).flatMap(Option.apply),
- max = map.get(KEY_MAX_VALUE).map(str2val).flatMap(Option.apply),
+ min = map.get(KEY_MIN_VALUE)
+ .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
+ max = map.get(KEY_MAX_VALUE)
+ .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong
@@ -184,6 +192,30 @@ object ColumnStat extends Logging {
}
/**
+ * Converts from string representation of external data type to the corresponding Catalyst data
+ * type.
+ */
+ private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
+ dataType match {
+ case BooleanType => s.toBoolean
+ case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
+ case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+ case ByteType => s.toByte
+ case ShortType => s.toShort
+ case IntegerType => s.toInt
+ case LongType => s.toLong
+ case FloatType => s.toFloat
+ case DoubleType => s.toDouble
+ case _: DecimalType => Decimal(s)
+ // This version of Spark does not use min/max for binary/string types so we ignore it.
+ case BinaryType | StringType => null
+ case _ =>
+ throw new AnalysisException("Column statistics deserialization is not supported for " +
+ s"column $name of data type: $dataType.")
+ }
+ }
+
+ /**
* Constructs an expression to compute column statistics for a given column.
*
* The expression should create a single struct column with the following schema:
@@ -232,11 +264,14 @@ object ColumnStat extends Logging {
}
/** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */
- def rowToColumnStat(row: Row): ColumnStat = {
+ def rowToColumnStat(row: Row, attr: Attribute): ColumnStat = {
ColumnStat(
distinctCount = BigInt(row.getLong(0)),
- min = Option(row.get(1)), // for string/binary min/max, get should return null
- max = Option(row.get(2)),
+ // for string/binary min/max, get should return null
+ min = Option(row.get(1))
+ .map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
+ max = Option(row.get(2))
+ .map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
nullCount = BigInt(row.getLong(3)),
avgLen = row.getLong(4),
maxLen = row.getLong(5)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index 5577233ffa..f1aff62cb6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -22,7 +22,7 @@ import scala.math.BigDecimal.RoundingMode
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.sql.types.{DecimalType, _}
object EstimationUtils {
@@ -75,4 +75,32 @@ object EstimationUtils {
// (simple computation of statistics returns product of children).
if (outputRowCount > 0) outputRowCount * sizePerRow else 1
}
+
+ /**
+ * For simplicity we use Decimal to unify operations for data types whose min/max values can be
+ * represented as numbers, e.g. Boolean can be represented as 0 (false) or 1 (true).
+ * The two methods below are the contract of conversion.
+ */
+ def toDecimal(value: Any, dataType: DataType): Decimal = {
+ dataType match {
+ case _: NumericType | DateType | TimestampType => Decimal(value.toString)
+ case BooleanType => if (value.asInstanceOf[Boolean]) Decimal(1) else Decimal(0)
+ }
+ }
+
+ def fromDecimal(dec: Decimal, dataType: DataType): Any = {
+ dataType match {
+ case BooleanType => dec.toLong == 1
+ case DateType => dec.toInt
+ case TimestampType => dec.toLong
+ case ByteType => dec.toByte
+ case ShortType => dec.toShort
+ case IntegerType => dec.toInt
+ case LongType => dec.toLong
+ case FloatType => dec.toFloat
+ case DoubleType => dec.toDouble
+ case _: DecimalType => dec
+ }
+ }
+
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 7bd8e65112..4b6b3b14d9 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -25,7 +25,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, LeafNode, Statistics}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -302,30 +301,6 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
}
/**
- * For a SQL data type, its internal data type may be different from its external type.
- * For DateType, its internal type is Int, and its external data type is Java Date type.
- * The min/max values in ColumnStat are saved in their corresponding external type.
- *
- * @param attrDataType the column data type
- * @param litValue the literal value
- * @return a BigDecimal value
- */
- def convertBoundValue(attrDataType: DataType, litValue: Any): Option[Any] = {
- attrDataType match {
- case DateType =>
- Some(DateTimeUtils.toJavaDate(litValue.toString.toInt))
- case TimestampType =>
- Some(DateTimeUtils.toJavaTimestamp(litValue.toString.toLong))
- case _: DecimalType =>
- Some(litValue.asInstanceOf[Decimal].toJavaBigDecimal)
- case StringType | BinaryType =>
- None
- case _ =>
- Some(litValue)
- }
- }
-
- /**
* Returns a percentage of rows meeting an equality (=) expression.
* This method evaluates the equality predicate for all data types.
*
@@ -356,12 +331,16 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
val statsRange = Range(colStat.min, colStat.max, attr.dataType)
if (statsRange.contains(literal)) {
if (update) {
- // We update ColumnStat structure after apply this equality predicate.
- // Set distinctCount to 1. Set nullCount to 0.
- // Need to save new min/max using the external type value of the literal
- val newValue = convertBoundValue(attr.dataType, literal.value)
- val newStats = colStat.copy(distinctCount = 1, min = newValue,
- max = newValue, nullCount = 0)
+ // We update ColumnStat structure after apply this equality predicate:
+ // Set distinctCount to 1, nullCount to 0, and min/max values (if exist) to the literal
+ // value.
+ val newStats = attr.dataType match {
+ case StringType | BinaryType =>
+ colStat.copy(distinctCount = 1, nullCount = 0)
+ case _ =>
+ colStat.copy(distinctCount = 1, min = Some(literal.value),
+ max = Some(literal.value), nullCount = 0)
+ }
colStatsMap(attr) = newStats
}
@@ -430,18 +409,14 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
return Some(0.0)
}
- // Need to save new min/max using the external type value of the literal
- val newMax = convertBoundValue(
- attr.dataType, validQuerySet.maxBy(v => BigDecimal(v.toString)))
- val newMin = convertBoundValue(
- attr.dataType, validQuerySet.minBy(v => BigDecimal(v.toString)))
-
+ val newMax = validQuerySet.maxBy(EstimationUtils.toDecimal(_, dataType))
+ val newMin = validQuerySet.minBy(EstimationUtils.toDecimal(_, dataType))
// newNdv should not be greater than the old ndv. For example, column has only 2 values
// 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
newNdv = ndv.min(BigInt(validQuerySet.size))
if (update) {
- val newStats = colStat.copy(distinctCount = newNdv, min = newMin,
- max = newMax, nullCount = 0)
+ val newStats = colStat.copy(distinctCount = newNdv, min = Some(newMin),
+ max = Some(newMax), nullCount = 0)
colStatsMap(attr) = newStats
}
@@ -478,8 +453,8 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
val colStat = colStatsMap(attr)
val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange]
- val max = BigDecimal(statsRange.max)
- val min = BigDecimal(statsRange.min)
+ val max = statsRange.max.toBigDecimal
+ val min = statsRange.min.toBigDecimal
val ndv = BigDecimal(colStat.distinctCount)
// determine the overlapping degree between predicate range and column's range
@@ -540,8 +515,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
}
if (update) {
- // Need to save new min/max using the external type value of the literal
- val newValue = convertBoundValue(attr.dataType, literal.value)
+ val newValue = Some(literal.value)
var newMax = colStat.max
var newMin = colStat.min
var newNdv = (ndv * percent).setScale(0, RoundingMode.HALF_UP).toBigInt()
@@ -606,14 +580,14 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
val colStatLeft = colStatsMap(attrLeft)
val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType)
.asInstanceOf[NumericRange]
- val maxLeft = BigDecimal(statsRangeLeft.max)
- val minLeft = BigDecimal(statsRangeLeft.min)
+ val maxLeft = statsRangeLeft.max
+ val minLeft = statsRangeLeft.min
val colStatRight = colStatsMap(attrRight)
val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType)
.asInstanceOf[NumericRange]
- val maxRight = BigDecimal(statsRangeRight.max)
- val minRight = BigDecimal(statsRangeRight.min)
+ val maxRight = statsRangeRight.max
+ val minRight = statsRangeRight.min
// determine the overlapping degree between predicate range and column's range
val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
index 3d13967cb6..4ac5ba5689 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
@@ -17,12 +17,8 @@
package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
-import java.math.{BigDecimal => JDecimal}
-import java.sql.{Date, Timestamp}
-
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types.{BooleanType, DateType, TimestampType, _}
+import org.apache.spark.sql.types._
/** Value range of a column. */
@@ -31,13 +27,10 @@ trait Range {
}
/** For simplicity we use decimal to unify operations of numeric ranges. */
-case class NumericRange(min: JDecimal, max: JDecimal) extends Range {
+case class NumericRange(min: Decimal, max: Decimal) extends Range {
override def contains(l: Literal): Boolean = {
- val decimal = l.dataType match {
- case BooleanType => if (l.value.asInstanceOf[Boolean]) new JDecimal(1) else new JDecimal(0)
- case _ => new JDecimal(l.value.toString)
- }
- min.compareTo(decimal) <= 0 && max.compareTo(decimal) >= 0
+ val lit = EstimationUtils.toDecimal(l.value, l.dataType)
+ min <= lit && max >= lit
}
}
@@ -58,7 +51,10 @@ object Range {
def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range = dataType match {
case StringType | BinaryType => new DefaultRange()
case _ if min.isEmpty || max.isEmpty => new NullRange()
- case _ => toNumericRange(min.get, max.get, dataType)
+ case _ =>
+ NumericRange(
+ min = EstimationUtils.toDecimal(min.get, dataType),
+ max = EstimationUtils.toDecimal(max.get, dataType))
}
def isIntersected(r1: Range, r2: Range): Boolean = (r1, r2) match {
@@ -82,51 +78,11 @@ object Range {
// binary/string types don't support intersecting.
(None, None)
case (n1: NumericRange, n2: NumericRange) =>
- val newRange = NumericRange(n1.min.max(n2.min), n1.max.min(n2.max))
- val (newMin, newMax) = fromNumericRange(newRange, dt)
- (Some(newMin), Some(newMax))
+ // Choose the maximum of two min values, and the minimum of two max values.
+ val newMin = if (n1.min <= n2.min) n2.min else n1.min
+ val newMax = if (n1.max <= n2.max) n1.max else n2.max
+ (Some(EstimationUtils.fromDecimal(newMin, dt)),
+ Some(EstimationUtils.fromDecimal(newMax, dt)))
}
}
-
- /**
- * For simplicity we use decimal to unify operations of numeric types, the two methods below
- * are the contract of conversion.
- */
- private def toNumericRange(min: Any, max: Any, dataType: DataType): NumericRange = {
- dataType match {
- case _: NumericType =>
- NumericRange(new JDecimal(min.toString), new JDecimal(max.toString))
- case BooleanType =>
- val min1 = if (min.asInstanceOf[Boolean]) 1 else 0
- val max1 = if (max.asInstanceOf[Boolean]) 1 else 0
- NumericRange(new JDecimal(min1), new JDecimal(max1))
- case DateType =>
- val min1 = DateTimeUtils.fromJavaDate(min.asInstanceOf[Date])
- val max1 = DateTimeUtils.fromJavaDate(max.asInstanceOf[Date])
- NumericRange(new JDecimal(min1), new JDecimal(max1))
- case TimestampType =>
- val min1 = DateTimeUtils.fromJavaTimestamp(min.asInstanceOf[Timestamp])
- val max1 = DateTimeUtils.fromJavaTimestamp(max.asInstanceOf[Timestamp])
- NumericRange(new JDecimal(min1), new JDecimal(max1))
- }
- }
-
- private def fromNumericRange(n: NumericRange, dataType: DataType): (Any, Any) = {
- dataType match {
- case _: IntegralType =>
- (n.min.longValue(), n.max.longValue())
- case FloatType | DoubleType =>
- (n.min.doubleValue(), n.max.doubleValue())
- case _: DecimalType =>
- (n.min, n.max)
- case BooleanType =>
- (n.min.longValue() == 1, n.max.longValue() == 1)
- case DateType =>
- (DateTimeUtils.toJavaDate(n.min.intValue()), DateTimeUtils.toJavaDate(n.max.intValue()))
- case TimestampType =>
- (DateTimeUtils.toJavaTimestamp(n.min.longValue()),
- DateTimeUtils.toJavaTimestamp(n.max.longValue()))
- }
- }
-
}