diff options
author | 蒋星博 <jiangxingbo@meituan.com> | 2016-07-14 00:21:27 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-07-14 00:21:27 +0800 |
commit | f376c37268848dbb4b2fb57677e22ef2bf207b49 (patch) | |
tree | bc4fc046291880943c4d2c5ad37625a7548baa84 /sql/catalyst/src/main/scala | |
parent | ea06e4ef34c860219a9aeec81816ef53ada96253 (diff) | |
download | spark-f376c37268848dbb4b2fb57677e22ef2bf207b49.tar.gz spark-f376c37268848dbb4b2fb57677e22ef2bf207b49.tar.bz2 spark-f376c37268848dbb4b2fb57677e22ef2bf207b49.zip |
[SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.
## What changes were proposed in this pull request?
Currently our Optimizer may reorder the predicates to run them more efficient, but in non-deterministic condition, change the order between deterministic parts and non-deterministic parts may change the number of input rows. For example:
```SELECT a FROM t WHERE rand() < 0.1 AND a = 1```
And
```SELECT a FROM t WHERE a = 1 AND rand() < 0.1```
may call rand() for different times and therefore the output rows differ.
This PR improved this condition by checking whether the predicate is placed before any non-deterministic predicates.
## How was this patch tested?
Expanded related testcases in FilterPushdownSuite.
Author: 蒋星博 <jiangxingbo@meituan.com>
Closes #14012 from jiangxb1987/ppd.
Diffstat (limited to 'sql/catalyst/src/main/scala')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 44 |
1 files changed, 29 insertions, 15 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 368e9a5396..08fb0199fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1128,19 +1128,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be - // pushed beneath must satisfy the following two conditions: + // pushed beneath must satisfy the following conditions: // 1. All the expressions are part of window partitioning key. The expressions can be compound. - // 2. Deterministic + // 2. Deterministic. + // 3. Placed before any non-deterministic predicates. case filter @ Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => - cond.references.subsetOf(partitionAttrs) && cond.deterministic && - // This is for ensuring all the partitioning expressions have been converted to alias - // in Analyzer. Thus, we do not need to check if the expressions in conditions are - // the same as the expressions used in partitioning columns. - partitionAttrs.forall(_.isInstanceOf[Attribute]) + + val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => + cond.references.subsetOf(partitionAttrs) } + + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) val newWindow = w.copy(child = Filter(pushDownPredicate, w.child)) @@ -1159,11 +1163,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // For each filter, expand the alias and check if the filter can be evaluated using // attributes produced by the aggregate operator's child operator. - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => + val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => val replaced = replaceAlias(cond, aliasMap) - replaced.references.subsetOf(aggregate.child.outputSet) && replaced.deterministic + replaced.references.subsetOf(aggregate.child.outputSet) } + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) val replaced = replaceAlias(pushDownPredicate, aliasMap) @@ -1177,9 +1186,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { case filter @ Filter(condition, union: Union) => // Union could change the rows, so non-deterministic predicate can't be pushed down - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => - cond.deterministic - } + val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic) + if (pushDown.nonEmpty) { val pushDownCond = pushDown.reduceLeft(And) val output = union.output @@ -1219,9 +1227,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // come from grandchild. // TODO: non-deterministic predicates could be pushed through some operators that do not change // the rows. - val (pushDown, stayUp) = splitConjunctivePredicates(filter.condition).partition { cond => - cond.deterministic && cond.references.subsetOf(grandchild.outputSet) + val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(filter.condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => + cond.references.subsetOf(grandchild.outputSet) } + + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) { val newChild = insertFilter(pushDown.reduceLeft(And)) if (stayUp.nonEmpty) { |