aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-13 13:36:39 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-13 13:36:39 -0700
commit56102dc2d849c221f325a7888cd66abb640ec887 (patch)
tree1094decdc8659750a4f3e5b31b7aa9abed43a323 /sql/catalyst
parent2ac40da3f9fa6d45a59bb45b41606f1931ac5e81 (diff)
downloadspark-56102dc2d849c221f325a7888cd66abb640ec887.tar.gz
spark-56102dc2d849c221f325a7888cd66abb640ec887.tar.bz2
spark-56102dc2d849c221f325a7888cd66abb640ec887.zip
[SPARK-2066][SQL] Adds checks for non-aggregate attributes with aggregation
This PR adds a new rule `CheckAggregation` to the analyzer to provide better error message for non-aggregate attributes with aggregation. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2774 from liancheng/non-aggregate-attr and squashes the following commits: 5246004 [Cheng Lian] Passes test suites bf1878d [Cheng Lian] Adds checks for non-aggregate attributes with aggregation
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala36
1 files changed, 31 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fe83eb1250..8255306314 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -63,7 +63,8 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
typeCoercionRules ++
extendedRules : _*),
Batch("Check Analysis", Once,
- CheckResolution),
+ CheckResolution,
+ CheckAggregation),
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
)
@@ -89,6 +90,32 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
}
/**
+ * Checks for non-aggregated attributes with aggregation
+ */
+ object CheckAggregation extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transform {
+ case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) =>
+ def isValidAggregateExpression(expr: Expression): Boolean = expr match {
+ case _: AggregateExpression => true
+ case e: Attribute => groupingExprs.contains(e)
+ case e if groupingExprs.contains(e) => true
+ case e if e.references.isEmpty => true
+ case e => e.children.forall(isValidAggregateExpression)
+ }
+
+ aggregateExprs.foreach { e =>
+ if (!isValidAggregateExpression(e)) {
+ throw new TreeNodeException(plan, s"Expression not in GROUP BY: $e")
+ }
+ }
+
+ aggregatePlan
+ }
+ }
+ }
+
+ /**
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
@@ -204,18 +231,17 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
*/
object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _))
+ case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _))
if aggregate.resolved && containsAggregate(havingCondition) => {
val evaluatedCondition = Alias(havingCondition, "havingCondition")()
val aggExprsWithHaving = evaluatedCondition +: originalAggExprs
-
+
Project(aggregate.output,
Filter(evaluatedCondition.toAttribute,
aggregate.copy(aggregateExpressions = aggExprsWithHaving)))
}
-
}
-
+
protected def containsAggregate(condition: Expression): Boolean =
condition
.collect { case ae: AggregateExpression => ae }