aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala22
1 files changed, 22 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 660314f86e..b26ceba228 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -959,6 +959,28 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
+ // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
+ // pushed beneath must satisfy the following two conditions:
+ // 1. All the expressions are part of window partitioning key. The expressions can be compound.
+ // 2. Deterministic
+ case filter @ Filter(condition, w: Window)
+ if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
+ val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
+ val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
+ cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
+ // This is for ensuring all the partitioning expressions have been converted to alias
+ // in Analyzer. Thus, we do not need to check if the expressions in conditions are
+ // the same as the expressions used in partitioning columns.
+ partitionAttrs.forall(_.isInstanceOf[Attribute])
+ }
+ if (pushDown.nonEmpty) {
+ val pushDownPredicate = pushDown.reduce(And)
+ val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
+ if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
+ } else {
+ filter
+ }
+
case filter @ Filter(condition, aggregate: Aggregate) =>
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression