aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2016-11-04 21:18:13 +0100
committerHerman van Hovell <hvanhovell@databricks.com>2016-11-04 21:18:13 +0100
commit550cd56e8b6addb26efe3ce16976c9c34fa0c832 (patch)
tree3404f604e2772a2b3b80efdb6a24f36bd6465652 /sql/catalyst
parenta42d738c5de08bd395a7c220c487146173c6c163 (diff)
downloadspark-550cd56e8b6addb26efe3ce16976c9c34fa0c832.tar.gz
spark-550cd56e8b6addb26efe3ce16976c9c34fa0c832.tar.bz2
spark-550cd56e8b6addb26efe3ce16976c9c34fa0c832.zip
[SPARK-17337][SQL] Do not pushdown predicates through filters with predicate subqueries
## What changes were proposed in this pull request? The `PushDownPredicate` rule can create a wrong result if we try to push a filter containing a predicate subquery through a project when the subquery and the project share attributes (have the same source). The current PR fixes this by making sure that we do not push down when there is a predicate subquery that outputs the same attributes as the filters new child plan. ## How was this patch tested? Added a test to `SubquerySuite`. nsyca has done previous work this. I have taken test from his initial PR. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #15761 from hvanhovell/SPARK-17337.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala16
1 files changed, 15 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b6ad5db74e..6ba8b33b3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -689,7 +689,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// state and all the input rows processed before. In another word, the order of input rows
// matters for non-deterministic expressions, while pushing down predicates changes the order.
case filter @ Filter(condition, project @ Project(fields, grandChild))
- if fields.forall(_.deterministic) =>
+ if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
@@ -830,6 +830,20 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}
}
+
+ /**
+ * Check if we can safely push a filter through a projection, by making sure that predicate
+ * subqueries in the condition do not contain the same attributes as the plan they are moved
+ * into. This can happen when the plan and predicate subquery have the same source.
+ */
+ private def canPushThroughCondition(plan: LogicalPlan, condition: Expression): Boolean = {
+ val attributes = plan.outputSet
+ val matched = condition.find {
+ case PredicateSubquery(p, _, _, _) => p.outputSet.intersect(attributes).nonEmpty
+ case _ => false
+ }
+ matched.isEmpty
+ }
}
/**