diff options
author | Dilip Biswal <dbiswal@us.ibm.com> | 2017-04-20 22:35:48 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2017-04-20 22:35:48 +0200 |
commit | d95e4d9d6a9705c534549add6d4a73d554e47274 (patch) | |
tree | 4aca11699cd4f3e775d22b9e34633d99e296ddd7 /sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | |
parent | b2ebadfd55283348b8a8b37e28075fca0798228a (diff) | |
download | spark-d95e4d9d6a9705c534549add6d4a73d554e47274.tar.gz spark-d95e4d9d6a9705c534549add6d4a73d554e47274.tar.bz2 spark-d95e4d9d6a9705c534549add6d4a73d554e47274.zip |
[SPARK-20334][SQL] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references.
## What changes were proposed in this pull request?
Address a follow up in [comment](https://github.com/apache/spark/pull/16954#discussion_r105718880)
Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following :
```SQL
SELECT t1a
FROM t1
GROUP BY 1
HAVING EXISTS (SELECT 1
FROM t2
WHERE t2a < min(t1a + t2a));
```
Exception snippet.
```
Cannot evaluate expression: min((input[0, int, false] + input[4, int, false]))
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103)
```
After this PR, a better error message is issued.
```
org.apache.spark.sql.AnalysisException
Error in query: Found an aggregate expression in a correlated
predicate that has both outer and local references, which is not supported yet.
Aggregate expression: min((t1.`t1a` + t2.`t2a`)),
Outer references: t1.`t1a`,
Local references: t2.`t2a`.;
```
## How was this patch tested?
Added tests in SQLQueryTestSuite.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes #17636 from dilipbiswal/subquery_followup1.
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 49 |
1 files changed, 38 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 175bfb3e80..eafeb4ac1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1204,6 +1204,28 @@ class Analyzer( private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { val outerReferences = ArrayBuffer.empty[Expression] + // Validate that correlated aggregate expression do not contain a mixture + // of outer and local references. + def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = { + expr.foreach { + case a: AggregateExpression if containsOuter(a) => + val outer = a.collect { case OuterReference(e) => e.toAttribute } + val local = a.references -- outer + if (local.nonEmpty) { + val msg = + s""" + |Found an aggregate expression in a correlated predicate that has both + |outer and local references, which is not supported yet. + |Aggregate expression: ${SubExprUtils.stripOuterReference(a).sql}, + |Outer references: ${outer.map(_.sql).mkString(", ")}, + |Local references: ${local.map(_.sql).mkString(", ")}. + """.stripMargin.replace("\n", " ").trim() + failAnalysis(msg) + } + case _ => + } + } + // Make sure a plan's subtree does not contain outer references def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = { if (hasOuterReferences(p)) { @@ -1211,9 +1233,12 @@ class Analyzer( } } - // Make sure a plan's expressions do not contain outer references - def failOnOuterReference(p: LogicalPlan): Unit = { - if (p.expressions.exists(containsOuter)) { + // Make sure a plan's expressions do not contain : + // 1. Aggregate expressions that have mixture of outer and local references. + // 2. Expressions containing outer references on plan nodes other than Filter. + def failOnInvalidOuterReference(p: LogicalPlan): Unit = { + p.expressions.foreach(checkMixedReferencesInsideAggregateExpr) + if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) { failAnalysis( "Expressions referencing the outer query are not supported outside of WHERE/HAVING " + s"clauses:\n$p") @@ -1283,9 +1308,9 @@ class Analyzer( // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. case s: Sort => - failOnOuterReference(s) + failOnInvalidOuterReference(s) case r: RepartitionByExpression => - failOnOuterReference(r) + failOnInvalidOuterReference(r) // Category 3: // Filter is one of the two operators allowed to host correlated expressions. @@ -1299,6 +1324,8 @@ class Analyzer( case _: EqualTo | _: EqualNullSafe => false case _ => true } + + failOnInvalidOuterReference(f) // The aggregate expressions are treated in a special way by getOuterReferences. If the // aggregate expression contains only outer reference attributes then the entire aggregate // expression is isolated as an OuterReference. @@ -1308,7 +1335,7 @@ class Analyzer( // Project cannot host any correlated expressions // but can be anywhere in a correlated subquery. case p: Project => - failOnOuterReference(p) + failOnInvalidOuterReference(p) // Aggregate cannot host any correlated expressions // It can be on a correlation path if the correlation contains @@ -1316,7 +1343,7 @@ class Analyzer( // It cannot be on a correlation path if the correlation has // non-equality correlated predicates. case a: Aggregate => - failOnOuterReference(a) + failOnInvalidOuterReference(a) failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a) // Join can host correlated expressions. @@ -1324,7 +1351,7 @@ class Analyzer( joinType match { // Inner join, like Filter, can be anywhere. case _: InnerLike => - failOnOuterReference(j) + failOnInvalidOuterReference(j) // Left outer join's right operand cannot be on a correlation path. // LeftAnti and ExistenceJoin are special cases of LeftOuter. @@ -1335,12 +1362,12 @@ class Analyzer( // Any correlated references in the subplan // of the right operand cannot be pulled up. case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => - failOnOuterReference(j) + failOnInvalidOuterReference(j) failOnOuterReferenceInSubTree(right) // Likewise, Right outer join's left operand cannot be on a correlation path. case RightOuter => - failOnOuterReference(j) + failOnInvalidOuterReference(j) failOnOuterReferenceInSubTree(left) // Any other join types not explicitly listed above, @@ -1356,7 +1383,7 @@ class Analyzer( // Note: // Generator with join=false is treated as Category 4. case g: Generate if g.join => - failOnOuterReference(g) + failOnInvalidOuterReference(g) // Category 4: Any other operators not in the above 3 categories // cannot be on a correlation path, that is they are allowed only |