aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDilip Biswal <dbiswal@us.ibm.com>2017-04-20 22:35:48 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2017-04-20 22:35:48 +0200
commitd95e4d9d6a9705c534549add6d4a73d554e47274 (patch)
tree4aca11699cd4f3e775d22b9e34633d99e296ddd7
parentb2ebadfd55283348b8a8b37e28075fca0798228a (diff)
downloadspark-d95e4d9d6a9705c534549add6d4a73d554e47274.tar.gz
spark-d95e4d9d6a9705c534549add6d4a73d554e47274.tar.bz2
spark-d95e4d9d6a9705c534549add6d4a73d554e47274.zip
[SPARK-20334][SQL] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references.
## What changes were proposed in this pull request? Address a follow up in [comment](https://github.com/apache/spark/pull/16954#discussion_r105718880) Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following : ```SQL SELECT t1a FROM t1 GROUP BY 1 HAVING EXISTS (SELECT 1 FROM t2 WHERE t2a < min(t1a + t2a)); ``` Exception snippet. ``` Cannot evaluate expression: min((input[0, int, false] + input[4, int, false])) at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226) at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103) ``` After this PR, a better error message is issued. ``` org.apache.spark.sql.AnalysisException Error in query: Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.; ``` ## How was this patch tested? Added tests in SQLQueryTestSuite. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17636 from dilipbiswal/subquery_followup1.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala49
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql74
-rw-r--r--sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out96
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala23
4 files changed, 181 insertions, 61 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 175bfb3e80..eafeb4ac1a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1204,6 +1204,28 @@ class Analyzer(
private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = {
val outerReferences = ArrayBuffer.empty[Expression]
+ // Validate that correlated aggregate expression do not contain a mixture
+ // of outer and local references.
+ def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
+ expr.foreach {
+ case a: AggregateExpression if containsOuter(a) =>
+ val outer = a.collect { case OuterReference(e) => e.toAttribute }
+ val local = a.references -- outer
+ if (local.nonEmpty) {
+ val msg =
+ s"""
+ |Found an aggregate expression in a correlated predicate that has both
+ |outer and local references, which is not supported yet.
+ |Aggregate expression: ${SubExprUtils.stripOuterReference(a).sql},
+ |Outer references: ${outer.map(_.sql).mkString(", ")},
+ |Local references: ${local.map(_.sql).mkString(", ")}.
+ """.stripMargin.replace("\n", " ").trim()
+ failAnalysis(msg)
+ }
+ case _ =>
+ }
+ }
+
// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (hasOuterReferences(p)) {
@@ -1211,9 +1233,12 @@ class Analyzer(
}
}
- // Make sure a plan's expressions do not contain outer references
- def failOnOuterReference(p: LogicalPlan): Unit = {
- if (p.expressions.exists(containsOuter)) {
+ // Make sure a plan's expressions do not contain :
+ // 1. Aggregate expressions that have mixture of outer and local references.
+ // 2. Expressions containing outer references on plan nodes other than Filter.
+ def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
+ p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
+ if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses:\n$p")
@@ -1283,9 +1308,9 @@ class Analyzer(
// These operators can be anywhere in a correlated subquery.
// so long as they do not host outer references in the operators.
case s: Sort =>
- failOnOuterReference(s)
+ failOnInvalidOuterReference(s)
case r: RepartitionByExpression =>
- failOnOuterReference(r)
+ failOnInvalidOuterReference(r)
// Category 3:
// Filter is one of the two operators allowed to host correlated expressions.
@@ -1299,6 +1324,8 @@ class Analyzer(
case _: EqualTo | _: EqualNullSafe => false
case _ => true
}
+
+ failOnInvalidOuterReference(f)
// The aggregate expressions are treated in a special way by getOuterReferences. If the
// aggregate expression contains only outer reference attributes then the entire aggregate
// expression is isolated as an OuterReference.
@@ -1308,7 +1335,7 @@ class Analyzer(
// Project cannot host any correlated expressions
// but can be anywhere in a correlated subquery.
case p: Project =>
- failOnOuterReference(p)
+ failOnInvalidOuterReference(p)
// Aggregate cannot host any correlated expressions
// It can be on a correlation path if the correlation contains
@@ -1316,7 +1343,7 @@ class Analyzer(
// It cannot be on a correlation path if the correlation has
// non-equality correlated predicates.
case a: Aggregate =>
- failOnOuterReference(a)
+ failOnInvalidOuterReference(a)
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)
// Join can host correlated expressions.
@@ -1324,7 +1351,7 @@ class Analyzer(
joinType match {
// Inner join, like Filter, can be anywhere.
case _: InnerLike =>
- failOnOuterReference(j)
+ failOnInvalidOuterReference(j)
// Left outer join's right operand cannot be on a correlation path.
// LeftAnti and ExistenceJoin are special cases of LeftOuter.
@@ -1335,12 +1362,12 @@ class Analyzer(
// Any correlated references in the subplan
// of the right operand cannot be pulled up.
case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
- failOnOuterReference(j)
+ failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(right)
// Likewise, Right outer join's left operand cannot be on a correlation path.
case RightOuter =>
- failOnOuterReference(j)
+ failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(left)
// Any other join types not explicitly listed above,
@@ -1356,7 +1383,7 @@ class Analyzer(
// Note:
// Generator with join=false is treated as Category 4.
case g: Generate if g.join =>
- failOnOuterReference(g)
+ failOnInvalidOuterReference(g)
// Category 4: Any other operators not in the above 3 categories
// cannot be on a correlation path, that is they are allowed only
diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql
index cf93c5a835..e22cade936 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql
@@ -1,42 +1,72 @@
-- The test file contains negative test cases
-- of invalid queries where error messages are expected.
-create temporary view t1 as select * from values
+CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
(1, 2, 3)
-as t1(t1a, t1b, t1c);
+AS t1(t1a, t1b, t1c);
-create temporary view t2 as select * from values
+CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
(1, 0, 1)
-as t2(t2a, t2b, t2c);
+AS t2(t2a, t2b, t2c);
-create temporary view t3 as select * from values
+CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
(3, 1, 2)
-as t3(t3a, t3b, t3c);
+AS t3(t3a, t3b, t3c);
-- TC 01.01
-- The column t2b in the SELECT of the subquery is invalid
-- because it is neither an aggregate function nor a GROUP BY column.
-select t1a, t2b
-from t1, t2
-where t1b = t2c
-and t2b = (select max(avg)
- from (select t2b, avg(t2b) avg
- from t2
- where t2a = t1.t1b
+SELECT t1a, t2b
+FROM t1, t2
+WHERE t1b = t2c
+AND t2b = (SELECT max(avg)
+ FROM (SELECT t2b, avg(t2b) avg
+ FROM t2
+ WHERE t2a = t1.t1b
)
)
;
-- TC 01.02
-- Invalid due to the column t2b not part of the output from table t2.
-select *
-from t1
-where t1a in (select min(t2a)
- from t2
- group by t2c
- having t2c in (select max(t3c)
- from t3
- group by t3b
- having t3b > t2b ))
+SELECT *
+FROM t1
+WHERE t1a IN (SELECT min(t2a)
+ FROM t2
+ GROUP BY t2c
+ HAVING t2c IN (SELECT max(t3c)
+ FROM t3
+ GROUP BY t3b
+ HAVING t3b > t2b ))
;
+-- TC 01.03
+-- Invalid due to mixure of outer and local references under an AggegatedExpression
+-- in a correlated predicate
+SELECT t1a
+FROM t1
+GROUP BY 1
+HAVING EXISTS (SELECT 1
+ FROM t2
+ WHERE t2a < min(t1a + t2a));
+
+-- TC 01.04
+-- Invalid due to mixure of outer and local references under an AggegatedExpression
+SELECT t1a
+FROM t1
+WHERE t1a IN (SELECT t2a
+ FROM t2
+ WHERE EXISTS (SELECT 1
+ FROM t3
+ GROUP BY 1
+ HAVING min(t2a + t3a) > 1));
+
+-- TC 01.05
+-- Invalid due to outer reference appearing in projection list
+SELECT t1a
+FROM t1
+WHERE t1a IN (SELECT t2a
+ FROM t2
+ WHERE EXISTS (SELECT min(t2a)
+ FROM t3));
+
diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
index f7bbb35aad..e4b1a2dbc6 100644
--- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
@@ -1,11 +1,11 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 5
+-- Number of queries: 8
-- !query 0
-create temporary view t1 as select * from values
+CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
(1, 2, 3)
-as t1(t1a, t1b, t1c)
+AS t1(t1a, t1b, t1c)
-- !query 0 schema
struct<>
-- !query 0 output
@@ -13,9 +13,9 @@ struct<>
-- !query 1
-create temporary view t2 as select * from values
+CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
(1, 0, 1)
-as t2(t2a, t2b, t2c)
+AS t2(t2a, t2b, t2c)
-- !query 1 schema
struct<>
-- !query 1 output
@@ -23,9 +23,9 @@ struct<>
-- !query 2
-create temporary view t3 as select * from values
+CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
(3, 1, 2)
-as t3(t3a, t3b, t3c)
+AS t3(t3a, t3b, t3c)
-- !query 2 schema
struct<>
-- !query 2 output
@@ -33,13 +33,13 @@ struct<>
-- !query 3
-select t1a, t2b
-from t1, t2
-where t1b = t2c
-and t2b = (select max(avg)
- from (select t2b, avg(t2b) avg
- from t2
- where t2a = t1.t1b
+SELECT t1a, t2b
+FROM t1, t2
+WHERE t1b = t2c
+AND t2b = (SELECT max(avg)
+ FROM (SELECT t2b, avg(t2b) avg
+ FROM t2
+ WHERE t2a = t1.t1b
)
)
-- !query 3 schema
@@ -50,17 +50,67 @@ grouping expressions sequence is empty, and 't2.`t2b`' is not an aggregate funct
-- !query 4
-select *
-from t1
-where t1a in (select min(t2a)
- from t2
- group by t2c
- having t2c in (select max(t3c)
- from t3
- group by t3b
- having t3b > t2b ))
+SELECT *
+FROM t1
+WHERE t1a IN (SELECT min(t2a)
+ FROM t2
+ GROUP BY t2c
+ HAVING t2c IN (SELECT max(t3c)
+ FROM t3
+ GROUP BY t3b
+ HAVING t3b > t2b ))
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.AnalysisException
resolved attribute(s) t2b#x missing from min(t2a)#x,t2c#x in operator !Filter t2c#x IN (list#x [t2b#x]);
+
+
+-- !query 5
+SELECT t1a
+FROM t1
+GROUP BY 1
+HAVING EXISTS (SELECT 1
+ FROM t2
+ WHERE t2a < min(t1a + t2a))
+-- !query 5 schema
+struct<>
+-- !query 5 output
+org.apache.spark.sql.AnalysisException
+Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.;
+
+
+-- !query 6
+SELECT t1a
+FROM t1
+WHERE t1a IN (SELECT t2a
+ FROM t2
+ WHERE EXISTS (SELECT 1
+ FROM t3
+ GROUP BY 1
+ HAVING min(t2a + t3a) > 1))
+-- !query 6 schema
+struct<>
+-- !query 6 output
+org.apache.spark.sql.AnalysisException
+Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t2.`t2a` + t3.`t3a`)), Outer references: t2.`t2a`, Local references: t3.`t3a`.;
+
+
+-- !query 7
+SELECT t1a
+FROM t1
+WHERE t1a IN (SELECT t2a
+ FROM t2
+ WHERE EXISTS (SELECT min(t2a)
+ FROM t3))
+-- !query 7 schema
+struct<>
+-- !query 7 output
+org.apache.spark.sql.AnalysisException
+Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses:
+Aggregate [min(outer(t2a#x)) AS min(outer())#x]
++- SubqueryAlias t3
+ +- Project [t3a#x, t3b#x, t3c#x]
+ +- SubqueryAlias t3
+ +- LocalRelation [t3a#x, t3b#x, t3c#x]
+;
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 0f0199cbe2..131abf7c1e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -822,12 +822,25 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
checkAnswer(
sql(
"""
- | select c2
- | from t1
- | where exists (select *
- | from t2 lateral view explode(arr_c2) q as c2
- where t1.c1 = t2.c1)""".stripMargin),
+ | SELECT c2
+ | FROM t1
+ | WHERE EXISTS (SELECT *
+ | FROM t2 LATERAL VIEW explode(arr_c2) q AS c2
+ WHERE t1.c1 = t2.c1)""".stripMargin),
Row(1) :: Row(0) :: Nil)
+
+ val msg1 = intercept[AnalysisException] {
+ sql(
+ """
+ | SELECT c1
+ | FROM t2
+ | WHERE EXISTS (SELECT *
+ | FROM t1 LATERAL VIEW explode(t2.arr_c2) q AS c2
+ | WHERE t1.c1 = t2.c1)
+ """.stripMargin)
+ }
+ assert(msg1.getMessage.contains(
+ "Expressions referencing the outer query are not supported outside of WHERE/HAVING"))
}
}