diff options
author | root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)> | 2016-11-08 12:09:32 +0100 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-11-08 12:09:32 +0100 |
commit | c291bd2745a8a2e4ba91d8697879eb8da10287e2 (patch) | |
tree | 5fd2f31509376493cdbd26a188f986961c880836 /sql/catalyst | |
parent | 47731e1865fa1e3a8881a1f4420017bdc026e455 (diff) | |
download | spark-c291bd2745a8a2e4ba91d8697879eb8da10287e2.tar.gz spark-c291bd2745a8a2e4ba91d8697879eb8da10287e2.tar.bz2 spark-c291bd2745a8a2e4ba91d8697879eb8da10287e2.zip |
[SPARK-18137][SQL] Fix RewriteDistinctAggregates UnresolvedException when a UDAF has a foldable TypeCheck
## What changes were proposed in this pull request?
In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception.
In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value).
**Before sql result**
> select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'percentile_approx(CAST(src.`key` AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000)
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:92)
> at org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates$.org$apache$spark$sql$catalyst$optimizer$RewriteDistinctAggregates$$nullify(RewriteDistinctAggregates.scala:261)
**After sql result**
> select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
> [498.0,309,79136]
## How was this patch tested?
Add a test case in HiveUDFSuit.
Author: root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)>
Closes #15668 from windpiger/RewriteDistinctUDAFUnresolveExcep.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala | 35 |
1 files changed, 26 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index d6a39ecf53..cd8912f793 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -115,9 +115,21 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } // Extract distinct aggregate expressions. - val distinctAggGroups = aggExpressions - .filter(_.isDistinct) - .groupBy(_.aggregateFunction.children.toSet) + val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e => + val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet + if (unfoldableChildren.nonEmpty) { + // Only expand the unfoldable children + unfoldableChildren + } else { + // If aggregateFunction's children are all foldable + // we must expand at least one of the children (here we take the first child), + // or If we don't, we will get the wrong result, for example: + // count(distinct 1) will be explained to count(1) after the rewrite function. + // Generally, the distinct aggregateFunction should not run + // foldable TypeCheck for the first child. + e.aggregateFunction.children.take(1).toSet + } + } // Check if the aggregates contains functions that do not support partial aggregation. val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial) @@ -136,8 +148,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { def evalWithinGroup(id: Literal, e: Expression) = If(EqualTo(gid, id), e, nullify(e)) def patchAggregateFunctionChildren( af: AggregateFunction)( - attrs: Expression => Expression): AggregateFunction = { - af.withNewChildren(af.children.map(attrs)).asInstanceOf[AggregateFunction] + attrs: Expression => Option[Expression]): AggregateFunction = { + val newChildren = af.children.map(c => attrs(c).getOrElse(c)) + af.withNewChildren(newChildren).asInstanceOf[AggregateFunction] } // Setup unique distinct aggregate children. @@ -161,7 +174,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { val operators = expressions.map { e => val af = e.aggregateFunction val naf = patchAggregateFunctionChildren(af) { x => - evalWithinGroup(id, distinctAggChildAttrLookup(x)) + distinctAggChildAttrLookup.get(x).map(evalWithinGroup(id, _)) } (e, e.copy(aggregateFunction = naf, isDistinct = false)) } @@ -170,8 +183,12 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } // Setup expand for the 'regular' aggregate expressions. - val regularAggExprs = aggExpressions.filter(!_.isDistinct) - val regularAggChildren = regularAggExprs.flatMap(_.aggregateFunction.children).distinct + // only expand unfoldable children + val regularAggExprs = aggExpressions + .filter(e => !e.isDistinct && e.children.exists(!_.foldable)) + val regularAggChildren = regularAggExprs + .flatMap(_.aggregateFunction.children.filter(!_.foldable)) + .distinct val regularAggChildAttrMap = regularAggChildren.map(expressionAttributePair) // Setup aggregates for 'regular' aggregate expressions. @@ -179,7 +196,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { val regularAggChildAttrLookup = regularAggChildAttrMap.toMap val regularAggOperatorMap = regularAggExprs.map { e => // Perform the actual aggregation in the initial aggregate. - val af = patchAggregateFunctionChildren(e.aggregateFunction)(regularAggChildAttrLookup) + val af = patchAggregateFunctionChildren(e.aggregateFunction)(regularAggChildAttrLookup.get) val operator = Alias(e.copy(aggregateFunction = af), e.sql)() // Select the result of the first aggregate in the last aggregate. |