aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorShuai Lin <linshuai2012@gmail.com>2017-02-24 10:24:01 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-24 10:24:01 -0800
commit05954f32e9bde56dc1f9a72028900d705185f6d7 (patch)
treef065aba9740bfa7da4620569e56a483bb7979c45 /sql/catalyst
parent3e40f6c3d6fc0bcd828d09031fa3994925394889 (diff)
downloadspark-05954f32e9bde56dc1f9a72028900d705185f6d7.tar.gz
spark-05954f32e9bde56dc1f9a72028900d705185f6d7.tar.bz2
spark-05954f32e9bde56dc1f9a72028900d705185f6d7.zip
[SPARK-17075][SQL] Follow up: fix file line ending and improve the tests
## What changes were proposed in this pull request? Fixed the line ending of `FilterEstimation.scala` (It's still using `\n\r`). Also improved the tests to cover the cases where the literals are on the left side of a binary operator. ## How was this patch tested? Existing unit tests. Author: Shuai Lin <linshuai2012@gmail.com> Closes #17051 from lins05/fix-cbo-filter-file-encoding.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala1022
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala23
2 files changed, 533 insertions, 512 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index fcc607a610..37f29ba68a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -1,511 +1,511 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
-
-import java.sql.{Date, Timestamp}
-
-import scala.collection.immutable.{HashSet, Map}
-import scala.collection.mutable
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-
-case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging {
-
- /**
- * We use a mutable colStats because we need to update the corresponding ColumnStat
- * for a column after we apply a predicate condition. For example, column c has
- * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50),
- * we need to set the column's [min, max] value to [40, 100] after we evaluate the
- * first condition c > 40. We need to set the column's [min, max] value to [40, 50]
- * after we evaluate the second condition c <= 50.
- */
- private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty
-
- /**
- * Returns an option of Statistics for a Filter logical plan node.
- * For a given compound expression condition, this method computes filter selectivity
- * (or the percentage of rows meeting the filter condition), which
- * is used to compute row count, size in bytes, and the updated statistics after a given
- * predicated is applied.
- *
- * @return Option[Statistics] When there is no statistics collected, it returns None.
- */
- def estimate: Option[Statistics] = {
- // We first copy child node's statistics and then modify it based on filter selectivity.
- val stats: Statistics = plan.child.stats(catalystConf)
- if (stats.rowCount.isEmpty) return None
-
- // save a mutable copy of colStats so that we can later change it recursively
- mutableColStats = mutable.Map(stats.attributeStats.map(kv => (kv._1.exprId, kv._2)).toSeq: _*)
-
- // estimate selectivity of this filter predicate
- val filterSelectivity: Double = calculateFilterSelectivity(plan.condition) match {
- case Some(percent) => percent
- // for not-supported condition, set filter selectivity to a conservative estimate 100%
- case None => 1.0
- }
-
- // attributeStats has mapping Attribute-to-ColumnStat.
- // mutableColStats has mapping ExprId-to-ColumnStat.
- // We use an ExprId-to-Attribute map to facilitate the mapping Attribute-to-ColumnStat
- val expridToAttrMap: Map[ExprId, Attribute] =
- stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
- // copy mutableColStats contents to an immutable AttributeMap.
- val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
- mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
- val newColStats = AttributeMap(mutableAttributeStats.toSeq)
-
- val filteredRowCount: BigInt =
- EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * filterSelectivity)
- val filteredSizeInBytes =
- EstimationUtils.getOutputSize(plan.output, filteredRowCount, newColStats)
-
- Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCount),
- attributeStats = newColStats))
- }
-
- /**
- * Returns a percentage of rows meeting a compound condition in Filter node.
- * A compound condition is decomposed into multiple single conditions linked with AND, OR, NOT.
- * For logical AND conditions, we need to update stats after a condition estimation
- * so that the stats will be more accurate for subsequent estimation. This is needed for
- * range condition such as (c > 40 AND c <= 50)
- * For logical OR conditions, we do not update stats after a condition estimation.
- *
- * @param condition the compound logical expression
- * @param update a boolean flag to specify if we need to update ColumnStat of a column
- * for subsequent conditions
- * @return a double value to show the percentage of rows meeting a given condition.
- * It returns None if the condition is not supported.
- */
- def calculateFilterSelectivity(condition: Expression, update: Boolean = true): Option[Double] = {
-
- condition match {
- case And(cond1, cond2) =>
- (calculateFilterSelectivity(cond1, update), calculateFilterSelectivity(cond2, update))
- match {
- case (Some(p1), Some(p2)) => Some(p1 * p2)
- case (Some(p1), None) => Some(p1)
- case (None, Some(p2)) => Some(p2)
- case (None, None) => None
- }
-
- case Or(cond1, cond2) =>
- // For ease of debugging, we compute percent1 and percent2 in 2 statements.
- val percent1 = calculateFilterSelectivity(cond1, update = false)
- val percent2 = calculateFilterSelectivity(cond2, update = false)
- (percent1, percent2) match {
- case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * p2)))
- case (Some(p1), None) => Some(1.0)
- case (None, Some(p2)) => Some(1.0)
- case (None, None) => None
- }
-
- case Not(cond) => calculateFilterSelectivity(cond, update = false) match {
- case Some(percent) => Some(1.0 - percent)
- // for not-supported condition, set filter selectivity to a conservative estimate 100%
- case None => None
- }
-
- case _ =>
- calculateSingleCondition(condition, update)
- }
- }
-
- /**
- * Returns a percentage of rows meeting a single condition in Filter node.
- * Currently we only support binary predicates where one side is a column,
- * and the other is a literal.
- *
- * @param condition a single logical expression
- * @param update a boolean flag to specify if we need to update ColumnStat of a column
- * for subsequent conditions
- * @return Option[Double] value to show the percentage of rows meeting a given condition.
- * It returns None if the condition is not supported.
- */
- def calculateSingleCondition(condition: Expression, update: Boolean): Option[Double] = {
- condition match {
- // For evaluateBinary method, we assume the literal on the right side of an operator.
- // So we will change the order if not.
-
- // EqualTo does not care about the order
- case op @ EqualTo(ar: AttributeReference, l: Literal) =>
- evaluateBinary(op, ar, l, update)
- case op @ EqualTo(l: Literal, ar: AttributeReference) =>
- evaluateBinary(op, ar, l, update)
-
- case op @ LessThan(ar: AttributeReference, l: Literal) =>
- evaluateBinary(op, ar, l, update)
- case op @ LessThan(l: Literal, ar: AttributeReference) =>
- evaluateBinary(GreaterThan(ar, l), ar, l, update)
-
- case op @ LessThanOrEqual(ar: AttributeReference, l: Literal) =>
- evaluateBinary(op, ar, l, update)
- case op @ LessThanOrEqual(l: Literal, ar: AttributeReference) =>
- evaluateBinary(GreaterThanOrEqual(ar, l), ar, l, update)
-
- case op @ GreaterThan(ar: AttributeReference, l: Literal) =>
- evaluateBinary(op, ar, l, update)
- case op @ GreaterThan(l: Literal, ar: AttributeReference) =>
- evaluateBinary(LessThan(ar, l), ar, l, update)
-
- case op @ GreaterThanOrEqual(ar: AttributeReference, l: Literal) =>
- evaluateBinary(op, ar, l, update)
- case op @ GreaterThanOrEqual(l: Literal, ar: AttributeReference) =>
- evaluateBinary(LessThanOrEqual(ar, l), ar, l, update)
-
- case In(ar: AttributeReference, expList)
- if expList.forall(e => e.isInstanceOf[Literal]) =>
- // Expression [In (value, seq[Literal])] will be replaced with optimized version
- // [InSet (value, HashSet[Literal])] in Optimizer, but only for list.size > 10.
- // Here we convert In into InSet anyway, because they share the same processing logic.
- val hSet = expList.map(e => e.eval())
- evaluateInSet(ar, HashSet() ++ hSet, update)
-
- case InSet(ar: AttributeReference, set) =>
- evaluateInSet(ar, set, update)
-
- case IsNull(ar: AttributeReference) =>
- evaluateIsNull(ar, isNull = true, update)
-
- case IsNotNull(ar: AttributeReference) =>
- evaluateIsNull(ar, isNull = false, update)
-
- case _ =>
- // TODO: it's difficult to support string operators without advanced statistics.
- // Hence, these string operators Like(_, _) | Contains(_, _) | StartsWith(_, _)
- // | EndsWith(_, _) are not supported yet
- logDebug("[CBO] Unsupported filter condition: " + condition)
- None
- }
- }
-
- /**
- * Returns a percentage of rows meeting "IS NULL" or "IS NOT NULL" condition.
- *
- * @param attrRef an AttributeReference (or a column)
- * @param isNull set to true for "IS NULL" condition. set to false for "IS NOT NULL" condition
- * @param update a boolean flag to specify if we need to update ColumnStat of a given column
- * for subsequent conditions
- * @return an optional double value to show the percentage of rows meeting a given condition
- * It returns None if no statistics collected for a given column.
- */
- def evaluateIsNull(
- attrRef: AttributeReference,
- isNull: Boolean,
- update: Boolean)
- : Option[Double] = {
- if (!mutableColStats.contains(attrRef.exprId)) {
- logDebug("[CBO] No statistics for " + attrRef)
- return None
- }
- val aColStat = mutableColStats(attrRef.exprId)
- val rowCountValue = plan.child.stats(catalystConf).rowCount.get
- val nullPercent: BigDecimal =
- if (rowCountValue == 0) 0.0
- else BigDecimal(aColStat.nullCount) / BigDecimal(rowCountValue)
-
- if (update) {
- val newStats =
- if (isNull) aColStat.copy(distinctCount = 0, min = None, max = None)
- else aColStat.copy(nullCount = 0)
-
- mutableColStats += (attrRef.exprId -> newStats)
- }
-
- val percent =
- if (isNull) {
- nullPercent.toDouble
- }
- else {
- /** ISNOTNULL(column) */
- 1.0 - nullPercent.toDouble
- }
-
- Some(percent)
- }
-
- /**
- * Returns a percentage of rows meeting a binary comparison expression.
- *
- * @param op a binary comparison operator uch as =, <, <=, >, >=
- * @param attrRef an AttributeReference (or a column)
- * @param literal a literal value (or constant)
- * @param update a boolean flag to specify if we need to update ColumnStat of a given column
- * for subsequent conditions
- * @return an optional double value to show the percentage of rows meeting a given condition
- * It returns None if no statistics exists for a given column or wrong value.
- */
- def evaluateBinary(
- op: BinaryComparison,
- attrRef: AttributeReference,
- literal: Literal,
- update: Boolean)
- : Option[Double] = {
- if (!mutableColStats.contains(attrRef.exprId)) {
- logDebug("[CBO] No statistics for " + attrRef)
- return None
- }
-
- op match {
- case EqualTo(l, r) => evaluateEqualTo(attrRef, literal, update)
- case _ =>
- attrRef.dataType match {
- case _: NumericType | DateType | TimestampType =>
- evaluateBinaryForNumeric(op, attrRef, literal, update)
- case StringType | BinaryType =>
- // TODO: It is difficult to support other binary comparisons for String/Binary
- // type without min/max and advanced statistics like histogram.
- logDebug("[CBO] No range comparison statistics for String/Binary type " + attrRef)
- None
- }
- }
- }
-
- /**
- * For a SQL data type, its internal data type may be different from its external type.
- * For DateType, its internal type is Int, and its external data type is Java Date type.
- * The min/max values in ColumnStat are saved in their corresponding external type.
- *
- * @param attrDataType the column data type
- * @param litValue the literal value
- * @return a BigDecimal value
- */
- def convertBoundValue(attrDataType: DataType, litValue: Any): Option[Any] = {
- attrDataType match {
- case DateType =>
- Some(DateTimeUtils.toJavaDate(litValue.toString.toInt))
- case TimestampType =>
- Some(DateTimeUtils.toJavaTimestamp(litValue.toString.toLong))
- case StringType | BinaryType =>
- None
- case _ =>
- Some(litValue)
- }
- }
-
- /**
- * Returns a percentage of rows meeting an equality (=) expression.
- * This method evaluates the equality predicate for all data types.
- *
- * @param attrRef an AttributeReference (or a column)
- * @param literal a literal value (or constant)
- * @param update a boolean flag to specify if we need to update ColumnStat of a given column
- * for subsequent conditions
- * @return an optional double value to show the percentage of rows meeting a given condition
- */
- def evaluateEqualTo(
- attrRef: AttributeReference,
- literal: Literal,
- update: Boolean)
- : Option[Double] = {
-
- val aColStat = mutableColStats(attrRef.exprId)
- val ndv = aColStat.distinctCount
-
- // decide if the value is in [min, max] of the column.
- // We currently don't store min/max for binary/string type.
- // Hence, we assume it is in boundary for binary/string type.
- val statsRange = Range(aColStat.min, aColStat.max, attrRef.dataType)
- val inBoundary: Boolean = Range.rangeContainsLiteral(statsRange, literal)
-
- if (inBoundary) {
-
- if (update) {
- // We update ColumnStat structure after apply this equality predicate.
- // Set distinctCount to 1. Set nullCount to 0.
- // Need to save new min/max using the external type value of the literal
- val newValue = convertBoundValue(attrRef.dataType, literal.value)
- val newStats = aColStat.copy(distinctCount = 1, min = newValue,
- max = newValue, nullCount = 0)
- mutableColStats += (attrRef.exprId -> newStats)
- }
-
- Some(1.0 / ndv.toDouble)
- } else {
- Some(0.0)
- }
-
- }
-
- /**
- * Returns a percentage of rows meeting "IN" operator expression.
- * This method evaluates the equality predicate for all data types.
- *
- * @param attrRef an AttributeReference (or a column)
- * @param hSet a set of literal values
- * @param update a boolean flag to specify if we need to update ColumnStat of a given column
- * for subsequent conditions
- * @return an optional double value to show the percentage of rows meeting a given condition
- * It returns None if no statistics exists for a given column.
- */
-
- def evaluateInSet(
- attrRef: AttributeReference,
- hSet: Set[Any],
- update: Boolean)
- : Option[Double] = {
- if (!mutableColStats.contains(attrRef.exprId)) {
- logDebug("[CBO] No statistics for " + attrRef)
- return None
- }
-
- val aColStat = mutableColStats(attrRef.exprId)
- val ndv = aColStat.distinctCount
- val aType = attrRef.dataType
- var newNdv: Long = 0
-
- // use [min, max] to filter the original hSet
- aType match {
- case _: NumericType | DateType | TimestampType =>
- val statsRange =
- Range(aColStat.min, aColStat.max, aType).asInstanceOf[NumericRange]
-
- // To facilitate finding the min and max values in hSet, we map hSet values to BigDecimal.
- // Using hSetBigdec, we can find the min and max values quickly in the ordered hSetBigdec.
- val hSetBigdec = hSet.map(e => BigDecimal(e.toString))
- val validQuerySet = hSetBigdec.filter(e => e >= statsRange.min && e <= statsRange.max)
- // We use hSetBigdecToAnyMap to help us find the original hSet value.
- val hSetBigdecToAnyMap: Map[BigDecimal, Any] =
- hSet.map(e => BigDecimal(e.toString) -> e).toMap
-
- if (validQuerySet.isEmpty) {
- return Some(0.0)
- }
-
- // Need to save new min/max using the external type value of the literal
- val newMax = convertBoundValue(attrRef.dataType, hSetBigdecToAnyMap(validQuerySet.max))
- val newMin = convertBoundValue(attrRef.dataType, hSetBigdecToAnyMap(validQuerySet.min))
-
- // newNdv should not be greater than the old ndv. For example, column has only 2 values
- // 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
- newNdv = math.min(validQuerySet.size.toLong, ndv.longValue())
- if (update) {
- val newStats = aColStat.copy(distinctCount = newNdv, min = newMin,
- max = newMax, nullCount = 0)
- mutableColStats += (attrRef.exprId -> newStats)
- }
-
- // We assume the whole set since there is no min/max information for String/Binary type
- case StringType | BinaryType =>
- newNdv = math.min(hSet.size.toLong, ndv.longValue())
- if (update) {
- val newStats = aColStat.copy(distinctCount = newNdv, nullCount = 0)
- mutableColStats += (attrRef.exprId -> newStats)
- }
- }
-
- // return the filter selectivity. Without advanced statistics such as histograms,
- // we have to assume uniform distribution.
- Some(math.min(1.0, newNdv.toDouble / ndv.toDouble))
- }
-
- /**
- * Returns a percentage of rows meeting a binary comparison expression.
- * This method evaluate expression for Numeric columns only.
- *
- * @param op a binary comparison operator uch as =, <, <=, >, >=
- * @param attrRef an AttributeReference (or a column)
- * @param literal a literal value (or constant)
- * @param update a boolean flag to specify if we need to update ColumnStat of a given column
- * for subsequent conditions
- * @return an optional double value to show the percentage of rows meeting a given condition
- */
- def evaluateBinaryForNumeric(
- op: BinaryComparison,
- attrRef: AttributeReference,
- literal: Literal,
- update: Boolean)
- : Option[Double] = {
-
- var percent = 1.0
- val aColStat = mutableColStats(attrRef.exprId)
- val ndv = aColStat.distinctCount
- val statsRange =
- Range(aColStat.min, aColStat.max, attrRef.dataType).asInstanceOf[NumericRange]
-
- // determine the overlapping degree between predicate range and column's range
- val literalValueBD = BigDecimal(literal.value.toString)
- val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
- case _: LessThan =>
- (literalValueBD <= statsRange.min, literalValueBD > statsRange.max)
- case _: LessThanOrEqual =>
- (literalValueBD < statsRange.min, literalValueBD >= statsRange.max)
- case _: GreaterThan =>
- (literalValueBD >= statsRange.max, literalValueBD < statsRange.min)
- case _: GreaterThanOrEqual =>
- (literalValueBD > statsRange.max, literalValueBD <= statsRange.min)
- }
-
- if (noOverlap) {
- percent = 0.0
- } else if (completeOverlap) {
- percent = 1.0
- } else {
- // this is partial overlap case
- var newMax = aColStat.max
- var newMin = aColStat.min
- var newNdv = ndv
- val literalToDouble = literalValueBD.toDouble
- val maxToDouble = BigDecimal(statsRange.max).toDouble
- val minToDouble = BigDecimal(statsRange.min).toDouble
-
- // Without advanced statistics like histogram, we assume uniform data distribution.
- // We just prorate the adjusted range over the initial range to compute filter selectivity.
- // For ease of computation, we convert all relevant numeric values to Double.
- percent = op match {
- case _: LessThan =>
- (literalToDouble - minToDouble) / (maxToDouble - minToDouble)
- case _: LessThanOrEqual =>
- if (literalValueBD == BigDecimal(statsRange.min)) 1.0 / ndv.toDouble
- else (literalToDouble - minToDouble) / (maxToDouble - minToDouble)
- case _: GreaterThan =>
- (maxToDouble - literalToDouble) / (maxToDouble - minToDouble)
- case _: GreaterThanOrEqual =>
- if (literalValueBD == BigDecimal(statsRange.max)) 1.0 / ndv.toDouble
- else (maxToDouble - literalToDouble) / (maxToDouble - minToDouble)
- }
-
- // Need to save new min/max using the external type value of the literal
- val newValue = convertBoundValue(attrRef.dataType, literal.value)
-
- if (update) {
- op match {
- case _: GreaterThan => newMin = newValue
- case _: GreaterThanOrEqual => newMin = newValue
- case _: LessThan => newMax = newValue
- case _: LessThanOrEqual => newMax = newValue
- }
-
- newNdv = math.max(math.round(ndv.toDouble * percent), 1)
- val newStats = aColStat.copy(distinctCount = newNdv, min = newMin,
- max = newMax, nullCount = 0)
-
- mutableColStats += (attrRef.exprId -> newStats)
- }
- }
-
- Some(percent)
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging {
+
+ /**
+ * We use a mutable colStats because we need to update the corresponding ColumnStat
+ * for a column after we apply a predicate condition. For example, column c has
+ * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50),
+ * we need to set the column's [min, max] value to [40, 100] after we evaluate the
+ * first condition c > 40. We need to set the column's [min, max] value to [40, 50]
+ * after we evaluate the second condition c <= 50.
+ */
+ private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty
+
+ /**
+ * Returns an option of Statistics for a Filter logical plan node.
+ * For a given compound expression condition, this method computes filter selectivity
+ * (or the percentage of rows meeting the filter condition), which
+ * is used to compute row count, size in bytes, and the updated statistics after a given
+ * predicated is applied.
+ *
+ * @return Option[Statistics] When there is no statistics collected, it returns None.
+ */
+ def estimate: Option[Statistics] = {
+ // We first copy child node's statistics and then modify it based on filter selectivity.
+ val stats: Statistics = plan.child.stats(catalystConf)
+ if (stats.rowCount.isEmpty) return None
+
+ // save a mutable copy of colStats so that we can later change it recursively
+ mutableColStats = mutable.Map(stats.attributeStats.map(kv => (kv._1.exprId, kv._2)).toSeq: _*)
+
+ // estimate selectivity of this filter predicate
+ val filterSelectivity: Double = calculateFilterSelectivity(plan.condition) match {
+ case Some(percent) => percent
+ // for not-supported condition, set filter selectivity to a conservative estimate 100%
+ case None => 1.0
+ }
+
+ // attributeStats has mapping Attribute-to-ColumnStat.
+ // mutableColStats has mapping ExprId-to-ColumnStat.
+ // We use an ExprId-to-Attribute map to facilitate the mapping Attribute-to-ColumnStat
+ val expridToAttrMap: Map[ExprId, Attribute] =
+ stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
+ // copy mutableColStats contents to an immutable AttributeMap.
+ val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
+ mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
+ val newColStats = AttributeMap(mutableAttributeStats.toSeq)
+
+ val filteredRowCount: BigInt =
+ EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * filterSelectivity)
+ val filteredSizeInBytes =
+ EstimationUtils.getOutputSize(plan.output, filteredRowCount, newColStats)
+
+ Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCount),
+ attributeStats = newColStats))
+ }
+
+ /**
+ * Returns a percentage of rows meeting a compound condition in Filter node.
+ * A compound condition is decomposed into multiple single conditions linked with AND, OR, NOT.
+ * For logical AND conditions, we need to update stats after a condition estimation
+ * so that the stats will be more accurate for subsequent estimation. This is needed for
+ * range condition such as (c > 40 AND c <= 50)
+ * For logical OR conditions, we do not update stats after a condition estimation.
+ *
+ * @param condition the compound logical expression
+ * @param update a boolean flag to specify if we need to update ColumnStat of a column
+ * for subsequent conditions
+ * @return a double value to show the percentage of rows meeting a given condition.
+ * It returns None if the condition is not supported.
+ */
+ def calculateFilterSelectivity(condition: Expression, update: Boolean = true): Option[Double] = {
+
+ condition match {
+ case And(cond1, cond2) =>
+ (calculateFilterSelectivity(cond1, update), calculateFilterSelectivity(cond2, update))
+ match {
+ case (Some(p1), Some(p2)) => Some(p1 * p2)
+ case (Some(p1), None) => Some(p1)
+ case (None, Some(p2)) => Some(p2)
+ case (None, None) => None
+ }
+
+ case Or(cond1, cond2) =>
+ // For ease of debugging, we compute percent1 and percent2 in 2 statements.
+ val percent1 = calculateFilterSelectivity(cond1, update = false)
+ val percent2 = calculateFilterSelectivity(cond2, update = false)
+ (percent1, percent2) match {
+ case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * p2)))
+ case (Some(p1), None) => Some(1.0)
+ case (None, Some(p2)) => Some(1.0)
+ case (None, None) => None
+ }
+
+ case Not(cond) => calculateFilterSelectivity(cond, update = false) match {
+ case Some(percent) => Some(1.0 - percent)
+ // for not-supported condition, set filter selectivity to a conservative estimate 100%
+ case None => None
+ }
+
+ case _ =>
+ calculateSingleCondition(condition, update)
+ }
+ }
+
+ /**
+ * Returns a percentage of rows meeting a single condition in Filter node.
+ * Currently we only support binary predicates where one side is a column,
+ * and the other is a literal.
+ *
+ * @param condition a single logical expression
+ * @param update a boolean flag to specify if we need to update ColumnStat of a column
+ * for subsequent conditions
+ * @return Option[Double] value to show the percentage of rows meeting a given condition.
+ * It returns None if the condition is not supported.
+ */
+ def calculateSingleCondition(condition: Expression, update: Boolean): Option[Double] = {
+ condition match {
+ // For evaluateBinary method, we assume the literal on the right side of an operator.
+ // So we will change the order if not.
+
+ // EqualTo does not care about the order
+ case op @ EqualTo(ar: AttributeReference, l: Literal) =>
+ evaluateBinary(op, ar, l, update)
+ case op @ EqualTo(l: Literal, ar: AttributeReference) =>
+ evaluateBinary(op, ar, l, update)
+
+ case op @ LessThan(ar: AttributeReference, l: Literal) =>
+ evaluateBinary(op, ar, l, update)
+ case op @ LessThan(l: Literal, ar: AttributeReference) =>
+ evaluateBinary(GreaterThan(ar, l), ar, l, update)
+
+ case op @ LessThanOrEqual(ar: AttributeReference, l: Literal) =>
+ evaluateBinary(op, ar, l, update)
+ case op @ LessThanOrEqual(l: Literal, ar: AttributeReference) =>
+ evaluateBinary(GreaterThanOrEqual(ar, l), ar, l, update)
+
+ case op @ GreaterThan(ar: AttributeReference, l: Literal) =>
+ evaluateBinary(op, ar, l, update)
+ case op @ GreaterThan(l: Literal, ar: AttributeReference) =>
+ evaluateBinary(LessThan(ar, l), ar, l, update)
+
+ case op @ GreaterThanOrEqual(ar: AttributeReference, l: Literal) =>
+ evaluateBinary(op, ar, l, update)
+ case op @ GreaterThanOrEqual(l: Literal, ar: AttributeReference) =>
+ evaluateBinary(LessThanOrEqual(ar, l), ar, l, update)
+
+ case In(ar: AttributeReference, expList)
+ if expList.forall(e => e.isInstanceOf[Literal]) =>
+ // Expression [In (value, seq[Literal])] will be replaced with optimized version
+ // [InSet (value, HashSet[Literal])] in Optimizer, but only for list.size > 10.
+ // Here we convert In into InSet anyway, because they share the same processing logic.
+ val hSet = expList.map(e => e.eval())
+ evaluateInSet(ar, HashSet() ++ hSet, update)
+
+ case InSet(ar: AttributeReference, set) =>
+ evaluateInSet(ar, set, update)
+
+ case IsNull(ar: AttributeReference) =>
+ evaluateIsNull(ar, isNull = true, update)
+
+ case IsNotNull(ar: AttributeReference) =>
+ evaluateIsNull(ar, isNull = false, update)
+
+ case _ =>
+ // TODO: it's difficult to support string operators without advanced statistics.
+ // Hence, these string operators Like(_, _) | Contains(_, _) | StartsWith(_, _)
+ // | EndsWith(_, _) are not supported yet
+ logDebug("[CBO] Unsupported filter condition: " + condition)
+ None
+ }
+ }
+
+ /**
+ * Returns a percentage of rows meeting "IS NULL" or "IS NOT NULL" condition.
+ *
+ * @param attrRef an AttributeReference (or a column)
+ * @param isNull set to true for "IS NULL" condition. set to false for "IS NOT NULL" condition
+ * @param update a boolean flag to specify if we need to update ColumnStat of a given column
+ * for subsequent conditions
+ * @return an optional double value to show the percentage of rows meeting a given condition
+ * It returns None if no statistics collected for a given column.
+ */
+ def evaluateIsNull(
+ attrRef: AttributeReference,
+ isNull: Boolean,
+ update: Boolean)
+ : Option[Double] = {
+ if (!mutableColStats.contains(attrRef.exprId)) {
+ logDebug("[CBO] No statistics for " + attrRef)
+ return None
+ }
+ val aColStat = mutableColStats(attrRef.exprId)
+ val rowCountValue = plan.child.stats(catalystConf).rowCount.get
+ val nullPercent: BigDecimal =
+ if (rowCountValue == 0) 0.0
+ else BigDecimal(aColStat.nullCount) / BigDecimal(rowCountValue)
+
+ if (update) {
+ val newStats =
+ if (isNull) aColStat.copy(distinctCount = 0, min = None, max = None)
+ else aColStat.copy(nullCount = 0)
+
+ mutableColStats += (attrRef.exprId -> newStats)
+ }
+
+ val percent =
+ if (isNull) {
+ nullPercent.toDouble
+ }
+ else {
+ /** ISNOTNULL(column) */
+ 1.0 - nullPercent.toDouble
+ }
+
+ Some(percent)
+ }
+
+ /**
+ * Returns a percentage of rows meeting a binary comparison expression.
+ *
+ * @param op a binary comparison operator uch as =, <, <=, >, >=
+ * @param attrRef an AttributeReference (or a column)
+ * @param literal a literal value (or constant)
+ * @param update a boolean flag to specify if we need to update ColumnStat of a given column
+ * for subsequent conditions
+ * @return an optional double value to show the percentage of rows meeting a given condition
+ * It returns None if no statistics exists for a given column or wrong value.
+ */
+ def evaluateBinary(
+ op: BinaryComparison,
+ attrRef: AttributeReference,
+ literal: Literal,
+ update: Boolean)
+ : Option[Double] = {
+ if (!mutableColStats.contains(attrRef.exprId)) {
+ logDebug("[CBO] No statistics for " + attrRef)
+ return None
+ }
+
+ op match {
+ case EqualTo(l, r) => evaluateEqualTo(attrRef, literal, update)
+ case _ =>
+ attrRef.dataType match {
+ case _: NumericType | DateType | TimestampType =>
+ evaluateBinaryForNumeric(op, attrRef, literal, update)
+ case StringType | BinaryType =>
+ // TODO: It is difficult to support other binary comparisons for String/Binary
+ // type without min/max and advanced statistics like histogram.
+ logDebug("[CBO] No range comparison statistics for String/Binary type " + attrRef)
+ None
+ }
+ }
+ }
+
+ /**
+ * For a SQL data type, its internal data type may be different from its external type.
+ * For DateType, its internal type is Int, and its external data type is Java Date type.
+ * The min/max values in ColumnStat are saved in their corresponding external type.
+ *
+ * @param attrDataType the column data type
+ * @param litValue the literal value
+ * @return a BigDecimal value
+ */
+ def convertBoundValue(attrDataType: DataType, litValue: Any): Option[Any] = {
+ attrDataType match {
+ case DateType =>
+ Some(DateTimeUtils.toJavaDate(litValue.toString.toInt))
+ case TimestampType =>
+ Some(DateTimeUtils.toJavaTimestamp(litValue.toString.toLong))
+ case StringType | BinaryType =>
+ None
+ case _ =>
+ Some(litValue)
+ }
+ }
+
+ /**
+ * Returns a percentage of rows meeting an equality (=) expression.
+ * This method evaluates the equality predicate for all data types.
+ *
+ * @param attrRef an AttributeReference (or a column)
+ * @param literal a literal value (or constant)
+ * @param update a boolean flag to specify if we need to update ColumnStat of a given column
+ * for subsequent conditions
+ * @return an optional double value to show the percentage of rows meeting a given condition
+ */
+ def evaluateEqualTo(
+ attrRef: AttributeReference,
+ literal: Literal,
+ update: Boolean)
+ : Option[Double] = {
+
+ val aColStat = mutableColStats(attrRef.exprId)
+ val ndv = aColStat.distinctCount
+
+ // decide if the value is in [min, max] of the column.
+ // We currently don't store min/max for binary/string type.
+ // Hence, we assume it is in boundary for binary/string type.
+ val statsRange = Range(aColStat.min, aColStat.max, attrRef.dataType)
+ val inBoundary: Boolean = Range.rangeContainsLiteral(statsRange, literal)
+
+ if (inBoundary) {
+
+ if (update) {
+ // We update ColumnStat structure after apply this equality predicate.
+ // Set distinctCount to 1. Set nullCount to 0.
+ // Need to save new min/max using the external type value of the literal
+ val newValue = convertBoundValue(attrRef.dataType, literal.value)
+ val newStats = aColStat.copy(distinctCount = 1, min = newValue,
+ max = newValue, nullCount = 0)
+ mutableColStats += (attrRef.exprId -> newStats)
+ }
+
+ Some(1.0 / ndv.toDouble)
+ } else {
+ Some(0.0)
+ }
+
+ }
+
+ /**
+ * Returns a percentage of rows meeting "IN" operator expression.
+ * This method evaluates the equality predicate for all data types.
+ *
+ * @param attrRef an AttributeReference (or a column)
+ * @param hSet a set of literal values
+ * @param update a boolean flag to specify if we need to update ColumnStat of a given column
+ * for subsequent conditions
+ * @return an optional double value to show the percentage of rows meeting a given condition
+ * It returns None if no statistics exists for a given column.
+ */
+
+ def evaluateInSet(
+ attrRef: AttributeReference,
+ hSet: Set[Any],
+ update: Boolean)
+ : Option[Double] = {
+ if (!mutableColStats.contains(attrRef.exprId)) {
+ logDebug("[CBO] No statistics for " + attrRef)
+ return None
+ }
+
+ val aColStat = mutableColStats(attrRef.exprId)
+ val ndv = aColStat.distinctCount
+ val aType = attrRef.dataType
+ var newNdv: Long = 0
+
+ // use [min, max] to filter the original hSet
+ aType match {
+ case _: NumericType | DateType | TimestampType =>
+ val statsRange =
+ Range(aColStat.min, aColStat.max, aType).asInstanceOf[NumericRange]
+
+ // To facilitate finding the min and max values in hSet, we map hSet values to BigDecimal.
+ // Using hSetBigdec, we can find the min and max values quickly in the ordered hSetBigdec.
+ val hSetBigdec = hSet.map(e => BigDecimal(e.toString))
+ val validQuerySet = hSetBigdec.filter(e => e >= statsRange.min && e <= statsRange.max)
+ // We use hSetBigdecToAnyMap to help us find the original hSet value.
+ val hSetBigdecToAnyMap: Map[BigDecimal, Any] =
+ hSet.map(e => BigDecimal(e.toString) -> e).toMap
+
+ if (validQuerySet.isEmpty) {
+ return Some(0.0)
+ }
+
+ // Need to save new min/max using the external type value of the literal
+ val newMax = convertBoundValue(attrRef.dataType, hSetBigdecToAnyMap(validQuerySet.max))
+ val newMin = convertBoundValue(attrRef.dataType, hSetBigdecToAnyMap(validQuerySet.min))
+
+ // newNdv should not be greater than the old ndv. For example, column has only 2 values
+ // 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
+ newNdv = math.min(validQuerySet.size.toLong, ndv.longValue())
+ if (update) {
+ val newStats = aColStat.copy(distinctCount = newNdv, min = newMin,
+ max = newMax, nullCount = 0)
+ mutableColStats += (attrRef.exprId -> newStats)
+ }
+
+ // We assume the whole set since there is no min/max information for String/Binary type
+ case StringType | BinaryType =>
+ newNdv = math.min(hSet.size.toLong, ndv.longValue())
+ if (update) {
+ val newStats = aColStat.copy(distinctCount = newNdv, nullCount = 0)
+ mutableColStats += (attrRef.exprId -> newStats)
+ }
+ }
+
+ // return the filter selectivity. Without advanced statistics such as histograms,
+ // we have to assume uniform distribution.
+ Some(math.min(1.0, newNdv.toDouble / ndv.toDouble))
+ }
+
+ /**
+ * Returns a percentage of rows meeting a binary comparison expression.
+ * This method evaluate expression for Numeric columns only.
+ *
+ * @param op a binary comparison operator uch as =, <, <=, >, >=
+ * @param attrRef an AttributeReference (or a column)
+ * @param literal a literal value (or constant)
+ * @param update a boolean flag to specify if we need to update ColumnStat of a given column
+ * for subsequent conditions
+ * @return an optional double value to show the percentage of rows meeting a given condition
+ */
+ def evaluateBinaryForNumeric(
+ op: BinaryComparison,
+ attrRef: AttributeReference,
+ literal: Literal,
+ update: Boolean)
+ : Option[Double] = {
+
+ var percent = 1.0
+ val aColStat = mutableColStats(attrRef.exprId)
+ val ndv = aColStat.distinctCount
+ val statsRange =
+ Range(aColStat.min, aColStat.max, attrRef.dataType).asInstanceOf[NumericRange]
+
+ // determine the overlapping degree between predicate range and column's range
+ val literalValueBD = BigDecimal(literal.value.toString)
+ val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+ case _: LessThan =>
+ (literalValueBD <= statsRange.min, literalValueBD > statsRange.max)
+ case _: LessThanOrEqual =>
+ (literalValueBD < statsRange.min, literalValueBD >= statsRange.max)
+ case _: GreaterThan =>
+ (literalValueBD >= statsRange.max, literalValueBD < statsRange.min)
+ case _: GreaterThanOrEqual =>
+ (literalValueBD > statsRange.max, literalValueBD <= statsRange.min)
+ }
+
+ if (noOverlap) {
+ percent = 0.0
+ } else if (completeOverlap) {
+ percent = 1.0
+ } else {
+ // this is partial overlap case
+ var newMax = aColStat.max
+ var newMin = aColStat.min
+ var newNdv = ndv
+ val literalToDouble = literalValueBD.toDouble
+ val maxToDouble = BigDecimal(statsRange.max).toDouble
+ val minToDouble = BigDecimal(statsRange.min).toDouble
+
+ // Without advanced statistics like histogram, we assume uniform data distribution.
+ // We just prorate the adjusted range over the initial range to compute filter selectivity.
+ // For ease of computation, we convert all relevant numeric values to Double.
+ percent = op match {
+ case _: LessThan =>
+ (literalToDouble - minToDouble) / (maxToDouble - minToDouble)
+ case _: LessThanOrEqual =>
+ if (literalValueBD == BigDecimal(statsRange.min)) 1.0 / ndv.toDouble
+ else (literalToDouble - minToDouble) / (maxToDouble - minToDouble)
+ case _: GreaterThan =>
+ (maxToDouble - literalToDouble) / (maxToDouble - minToDouble)
+ case _: GreaterThanOrEqual =>
+ if (literalValueBD == BigDecimal(statsRange.max)) 1.0 / ndv.toDouble
+ else (maxToDouble - literalToDouble) / (maxToDouble - minToDouble)
+ }
+
+ // Need to save new min/max using the external type value of the literal
+ val newValue = convertBoundValue(attrRef.dataType, literal.value)
+
+ if (update) {
+ op match {
+ case _: GreaterThan => newMin = newValue
+ case _: GreaterThanOrEqual => newMin = newValue
+ case _: LessThan => newMax = newValue
+ case _: LessThanOrEqual => newMax = newValue
+ }
+
+ newNdv = math.max(math.round(ndv.toDouble * percent), 1)
+ val newStats = aColStat.copy(distinctCount = newNdv, min = newMin,
+ max = newMax, nullCount = 0)
+
+ mutableColStats += (attrRef.exprId -> newStats)
+ }
+ }
+
+ Some(percent)
+ }
+
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
index f139c9e28c..f5e306f9e5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
@@ -398,6 +398,27 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// For all other SQL types, we compare the entire object directly.
assert(filteredStats.attributeStats(ar) == expectedColStats)
}
- }
+ // If the filter has a binary operator (including those nested inside
+ // AND/OR/NOT), swap the sides of the attribte and the literal, reverse the
+ // operator, and then check again.
+ val rewrittenFilter = filterNode transformExpressionsDown {
+ case op @ EqualTo(ar: AttributeReference, l: Literal) =>
+ EqualTo(l, ar)
+
+ case op @ LessThan(ar: AttributeReference, l: Literal) =>
+ GreaterThan(l, ar)
+ case op @ LessThanOrEqual(ar: AttributeReference, l: Literal) =>
+ GreaterThanOrEqual(l, ar)
+
+ case op @ GreaterThan(ar: AttributeReference, l: Literal) =>
+ LessThan(l, ar)
+ case op @ GreaterThanOrEqual(ar: AttributeReference, l: Literal) =>
+ LessThanOrEqual(l, ar)
+ }
+
+ if (rewrittenFilter != filterNode) {
+ validateEstimatedStats(ar, rewrittenFilter, expectedColStats, rowCount)
+ }
+ }
}