aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main
diff options
context:
space:
mode:
authorwangzhenhua <wangzhenhua@huawei.com>2017-01-11 15:00:58 -0800
committerReynold Xin <rxin@databricks.com>2017-01-11 15:00:58 -0800
commit43fa21b3e62ee108bcecb74398f431f08c6b625c (patch)
tree6fd18d171f48d1db02b7d1cd61e47dc1244b4f33 /sql/catalyst/src/main
parent66fe819ada6435f3a351c2d257e73b8e6f6085cd (diff)
downloadspark-43fa21b3e62ee108bcecb74398f431f08c6b625c.tar.gz
spark-43fa21b3e62ee108bcecb74398f431f08c6b625c.tar.bz2
spark-43fa21b3e62ee108bcecb74398f431f08c6b625c.zip
[SPARK-19132][SQL] Add test cases for row size estimation and aggregate estimation
## What changes were proposed in this pull request? In this pr, we add more test cases for project and aggregate estimation. ## How was this patch tested? Add test cases. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #16551 from wzhfy/addTests.
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala2
3 files changed, 20 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
index af673430c0..21e94fc941 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
@@ -41,13 +41,19 @@ object AggregateEstimation {
var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))(
(res, expr) => res * childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount)
- // Here we set another upper bound for the number of output rows: it must not be larger than
- // child's number of rows.
- outputRows = outputRows.min(childStats.rowCount.get)
+ outputRows = if (agg.groupingExpressions.isEmpty) {
+ // If there's no group-by columns, the output is a single row containing values of aggregate
+ // functions: aggregated results for non-empty input or initial values for empty input.
+ 1
+ } else {
+ // Here we set another upper bound for the number of output rows: it must not be larger than
+ // child's number of rows.
+ outputRows.min(childStats.rowCount.get)
+ }
val outputAttrStats = getOutputMap(childStats.attributeStats, agg.output)
Some(Statistics(
- sizeInBytes = outputRows * getRowSize(agg.output, outputAttrStats),
+ sizeInBytes = getOutputSize(agg.output, outputAttrStats, outputRows),
rowCount = Some(outputRows),
attributeStats = outputAttrStats,
isBroadcastable = childStats.isBroadcastable))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index c7eb6f0d7d..cf4452d0fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -35,10 +35,13 @@ object EstimationUtils {
AttributeMap(output.flatMap(a => inputMap.get(a).map(a -> _)))
}
- def getRowSize(attributes: Seq[Attribute], attrStats: AttributeMap[ColumnStat]): Long = {
+ def getOutputSize(
+ attributes: Seq[Attribute],
+ attrStats: AttributeMap[ColumnStat],
+ outputRowCount: BigInt): BigInt = {
// We assign a generic overhead for a Row object, the actual overhead is different for different
// Row format.
- 8 + attributes.map { attr =>
+ val sizePerRow = 8 + attributes.map { attr =>
if (attrStats.contains(attr)) {
attr.dataType match {
case StringType =>
@@ -51,5 +54,9 @@ object EstimationUtils {
attr.dataType.defaultSize
}
}.sum
+
+ // Output size can't be zero, or sizeInBytes of BinaryNode will also be zero
+ // (simple computation of statistics returns product of children).
+ if (outputRowCount > 0) outputRowCount * sizePerRow else 1
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
index 69c546b01b..50b869ab3a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
@@ -36,7 +36,7 @@ object ProjectEstimation {
val outputAttrStats =
getOutputMap(AttributeMap(inputAttrStats.toSeq ++ aliasStats), project.output)
Some(childStats.copy(
- sizeInBytes = childStats.rowCount.get * getRowSize(project.output, outputAttrStats),
+ sizeInBytes = getOutputSize(project.output, outputAttrStats, childStats.rowCount.get),
attributeStats = outputAttrStats))
} else {
None