aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorNattavut Sutyanyong <nsy.can@gmail.com>2016-11-14 20:59:15 +0100
committerHerman van Hovell <hvanhovell@databricks.com>2016-11-14 20:59:15 +0100
commitbd85603ba5f9e61e1aa8326d3e4d5703b5977a4c (patch)
tree19732c8de1a617e2e17543a7cf0d99135cc2850c /sql/catalyst
parent75934457d75996be71ffd0d4b448497d656c0d40 (diff)
downloadspark-bd85603ba5f9e61e1aa8326d3e4d5703b5977a4c.tar.gz
spark-bd85603ba5f9e61e1aa8326d3e4d5703b5977a4c.tar.bz2
spark-bd85603ba5f9e61e1aa8326d3e4d5703b5977a4c.zip
[SPARK-17348][SQL] Incorrect results from subquery transformation
## What changes were proposed in this pull request? Return an Analysis exception when there is a correlated non-equality predicate in a subquery and the correlated column from the outer reference is not from the immediate parent operator of the subquery. This PR prevents incorrect results from subquery transformation in such case. Test cases, both positive and negative tests, are added. ## How was this patch tested? sql/test, catalyst/test, hive/test, and scenarios that will produce incorrect results without this PR and product correct results when subquery transformation does happen. Author: Nattavut Sutyanyong <nsy.can@gmail.com> Closes #15763 from nsyca/spark-17348.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala44
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala7
2 files changed, 44 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index dd68d60d3e..c14f353517 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1031,6 +1031,37 @@ class Analyzer(
}
}
+ // SPARK-17348: A potential incorrect result case.
+ // When a correlated predicate is a non-equality predicate,
+ // certain operators are not permitted from the operator
+ // hosting the correlated predicate up to the operator on the outer table.
+ // Otherwise, the pull up of the correlated predicate
+ // will generate a plan with a different semantics
+ // which could return incorrect result.
+ // Currently we check for Aggregate and Window operators
+ //
+ // Below shows an example of a Logical Plan during Analyzer phase that
+ // show this problem. Pulling the correlated predicate [outer(c2#77) >= ..]
+ // through the Aggregate (or Window) operator could alter the result of
+ // the Aggregate.
+ //
+ // Project [c1#76]
+ // +- Project [c1#87, c2#88]
+ // : (Aggregate or Window operator)
+ // : +- Filter [outer(c2#77) >= c2#88)]
+ // : +- SubqueryAlias t2, `t2`
+ // : +- Project [_1#84 AS c1#87, _2#85 AS c2#88]
+ // : +- LocalRelation [_1#84, _2#85]
+ // +- SubqueryAlias t1, `t1`
+ // +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
+ // +- LocalRelation [_1#73, _2#74]
+ def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan): Unit = {
+ if (found) {
+ // Report a non-supported case as an exception
+ failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n$p")
+ }
+ }
+
/** Determine which correlated predicate references are missing from this plan. */
def missingReferences(p: LogicalPlan): AttributeSet = {
val localPredicateReferences = p.collect(predicateMap)
@@ -1041,12 +1072,20 @@ class Analyzer(
localPredicateReferences -- p.outputSet
}
+ var foundNonEqualCorrelatedPred : Boolean = false
+
// Simplify the predicates before pulling them out.
val transformed = BooleanSimplification(sub) transformUp {
case f @ Filter(cond, child) =>
// Find all predicates with an outer reference.
val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter)
+ // Find any non-equality correlated predicates
+ foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || correlated.exists {
+ case _: EqualTo | _: EqualNullSafe => false
+ case _ => true
+ }
+
// Rewrite the filter without the correlated predicates if any.
correlated match {
case Nil => f
@@ -1068,12 +1107,17 @@ class Analyzer(
}
case a @ Aggregate(grouping, expressions, child) =>
failOnOuterReference(a)
+ failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)
+
val referencesToAdd = missingReferences(a)
if (referencesToAdd.nonEmpty) {
Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child)
} else {
a
}
+ case w : Window =>
+ failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w)
+ w
case j @ Join(left, _, RightOuter, _) =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 3455a567b7..7b75c1f709 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -119,13 +119,6 @@ trait CheckAnalysis extends PredicateHelper {
}
case s @ ScalarSubquery(query, conditions, _) if conditions.nonEmpty =>
- // Make sure we are using equi-joins.
- conditions.foreach {
- case _: EqualTo | _: EqualNullSafe => // ok
- case e => failAnalysis(
- s"The correlated scalar subquery can only contain equality predicates: $e")
- }
-
// Make sure correlated scalar subqueries contain one row for every outer row by
// enforcing that they are aggregates which contain exactly one aggregate expressions.
// The analyzer has already checked that subquery contained only one output column, and