aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
diff options
context:
space:
mode:
authorDilip Biswal <dbiswal@us.ibm.com>2017-04-20 22:35:48 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2017-04-20 22:35:48 +0200
commitd95e4d9d6a9705c534549add6d4a73d554e47274 (patch)
tree4aca11699cd4f3e775d22b9e34633d99e296ddd7 /sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
parentb2ebadfd55283348b8a8b37e28075fca0798228a (diff)
downloadspark-d95e4d9d6a9705c534549add6d4a73d554e47274.tar.gz
spark-d95e4d9d6a9705c534549add6d4a73d554e47274.tar.bz2
spark-d95e4d9d6a9705c534549add6d4a73d554e47274.zip
[SPARK-20334][SQL] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references.
## What changes were proposed in this pull request? Address a follow up in [comment](https://github.com/apache/spark/pull/16954#discussion_r105718880) Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following : ```SQL SELECT t1a FROM t1 GROUP BY 1 HAVING EXISTS (SELECT 1 FROM t2 WHERE t2a < min(t1a + t2a)); ``` Exception snippet. ``` Cannot evaluate expression: min((input[0, int, false] + input[4, int, false])) at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226) at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103) ``` After this PR, a better error message is issued. ``` org.apache.spark.sql.AnalysisException Error in query: Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.; ``` ## How was this patch tested? Added tests in SQLQueryTestSuite. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17636 from dilipbiswal/subquery_followup1.
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala49
1 files changed, 38 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 175bfb3e80..eafeb4ac1a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1204,6 +1204,28 @@ class Analyzer(
private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = {
val outerReferences = ArrayBuffer.empty[Expression]
+ // Validate that correlated aggregate expression do not contain a mixture
+ // of outer and local references.
+ def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
+ expr.foreach {
+ case a: AggregateExpression if containsOuter(a) =>
+ val outer = a.collect { case OuterReference(e) => e.toAttribute }
+ val local = a.references -- outer
+ if (local.nonEmpty) {
+ val msg =
+ s"""
+ |Found an aggregate expression in a correlated predicate that has both
+ |outer and local references, which is not supported yet.
+ |Aggregate expression: ${SubExprUtils.stripOuterReference(a).sql},
+ |Outer references: ${outer.map(_.sql).mkString(", ")},
+ |Local references: ${local.map(_.sql).mkString(", ")}.
+ """.stripMargin.replace("\n", " ").trim()
+ failAnalysis(msg)
+ }
+ case _ =>
+ }
+ }
+
// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (hasOuterReferences(p)) {
@@ -1211,9 +1233,12 @@ class Analyzer(
}
}
- // Make sure a plan's expressions do not contain outer references
- def failOnOuterReference(p: LogicalPlan): Unit = {
- if (p.expressions.exists(containsOuter)) {
+ // Make sure a plan's expressions do not contain :
+ // 1. Aggregate expressions that have mixture of outer and local references.
+ // 2. Expressions containing outer references on plan nodes other than Filter.
+ def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
+ p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
+ if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses:\n$p")
@@ -1283,9 +1308,9 @@ class Analyzer(
// These operators can be anywhere in a correlated subquery.
// so long as they do not host outer references in the operators.
case s: Sort =>
- failOnOuterReference(s)
+ failOnInvalidOuterReference(s)
case r: RepartitionByExpression =>
- failOnOuterReference(r)
+ failOnInvalidOuterReference(r)
// Category 3:
// Filter is one of the two operators allowed to host correlated expressions.
@@ -1299,6 +1324,8 @@ class Analyzer(
case _: EqualTo | _: EqualNullSafe => false
case _ => true
}
+
+ failOnInvalidOuterReference(f)
// The aggregate expressions are treated in a special way by getOuterReferences. If the
// aggregate expression contains only outer reference attributes then the entire aggregate
// expression is isolated as an OuterReference.
@@ -1308,7 +1335,7 @@ class Analyzer(
// Project cannot host any correlated expressions
// but can be anywhere in a correlated subquery.
case p: Project =>
- failOnOuterReference(p)
+ failOnInvalidOuterReference(p)
// Aggregate cannot host any correlated expressions
// It can be on a correlation path if the correlation contains
@@ -1316,7 +1343,7 @@ class Analyzer(
// It cannot be on a correlation path if the correlation has
// non-equality correlated predicates.
case a: Aggregate =>
- failOnOuterReference(a)
+ failOnInvalidOuterReference(a)
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)
// Join can host correlated expressions.
@@ -1324,7 +1351,7 @@ class Analyzer(
joinType match {
// Inner join, like Filter, can be anywhere.
case _: InnerLike =>
- failOnOuterReference(j)
+ failOnInvalidOuterReference(j)
// Left outer join's right operand cannot be on a correlation path.
// LeftAnti and ExistenceJoin are special cases of LeftOuter.
@@ -1335,12 +1362,12 @@ class Analyzer(
// Any correlated references in the subplan
// of the right operand cannot be pulled up.
case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
- failOnOuterReference(j)
+ failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(right)
// Likewise, Right outer join's left operand cannot be on a correlation path.
case RightOuter =>
- failOnOuterReference(j)
+ failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(left)
// Any other join types not explicitly listed above,
@@ -1356,7 +1383,7 @@ class Analyzer(
// Note:
// Generator with join=false is treated as Category 4.
case g: Generate if g.join =>
- failOnOuterReference(g)
+ failOnInvalidOuterReference(g)
// Category 4: Any other operators not in the above 3 categories
// cannot be on a correlation path, that is they are allowed only