diff options
author | nitin goyal <nitin.goyal@guavus.com> | 2015-10-21 10:45:21 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-10-21 10:45:21 -0700 |
commit | f62e3260889d67256d335fd0dd38f114ae4e3eca (patch) | |
tree | dc094894d19991c5df1489b9c0fd2b14d53e8e5d /sql | |
parent | 8e82e59834aefb74d49e45c9c9c926bb53143b4c (diff) | |
download | spark-f62e3260889d67256d335fd0dd38f114ae4e3eca.tar.gz spark-f62e3260889d67256d335fd0dd38f114ae4e3eca.tar.bz2 spark-f62e3260889d67256d335fd0dd38f114ae4e3eca.zip |
[SPARK-11179] [SQL] Push filters through aggregate
Push conjunctive predicates though Aggregate operators when their references are a subset of the groupingExpressions.
Query plan before optimisation :-
Filter ((c#138L = 2) && (a#0 = 3))
Aggregate [a#0], [a#0,count(b#1) AS c#138L]
Project [a#0,b#1]
LocalRelation [a#0,b#1,c#2]
Query plan after optimisation :-
Filter (c#138L = 2)
Aggregate [a#0], [a#0,count(b#1) AS c#138L]
Filter (a#0 = 3)
Project [a#0,b#1]
LocalRelation [a#0,b#1,c#2]
Author: nitin goyal <nitin.goyal@guavus.com>
Author: nitin.goyal <nitin.goyal@guavus.com>
Closes #9167 from nitin2goyal/master.
Diffstat (limited to 'sql')
2 files changed, 69 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6557c7005d..0139b9e87c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -46,6 +46,7 @@ object DefaultOptimizer extends Optimizer { PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, + PushPredicateThroughAggregate, ColumnPruning, // Operator combine ProjectCollapsing, @@ -675,6 +676,29 @@ object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelp } /** + * Push [[Filter]] operators through [[Aggregate]] operators. Parts of the predicate that reference + * attributes which are subset of group by attribute set of [[Aggregate]] will be pushed beneath, + * and the rest should remain above. + */ +object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case filter @ Filter(condition, + aggregate @ Aggregate(groupingExpressions, aggregateExpressions, grandChild)) => + val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { + conjunct => conjunct.references subsetOf AttributeSet(groupingExpressions) + } + if (pushDown.nonEmpty) { + val pushDownPredicate = pushDown.reduce(And) + val withPushdown = aggregate.copy(child = Filter(pushDownPredicate, grandChild)) + stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown) + } else { + filter + } + } +} + +/** * Pushes down [[Filter]] operators where the `condition` can be * evaluated using only the attributes of the left or right side of a join. Other * [[Filter]] conditions are moved into the `condition` of the [[Join]]. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 0f1fde2fb0..ed810a1280 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -40,6 +40,7 @@ class FilterPushdownSuite extends PlanTest { BooleanSimplification, PushPredicateThroughJoin, PushPredicateThroughGenerate, + PushPredicateThroughAggregate, ColumnPruning, ProjectCollapsing) :: Nil } @@ -652,4 +653,48 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer.analyze) } + + test("aggregate: push down filter when filter on group by expression") { + val originalQuery = testRelation + .groupBy('a)('a, Count('b) as 'c) + .select('a, 'c) + .where('a === 2) + + val optimized = Optimize.execute(originalQuery.analyze) + + val correctAnswer = testRelation + .where('a === 2) + .groupBy('a)('a, Count('b) as 'c) + .analyze + comparePlans(optimized, correctAnswer) + } + + test("aggregate: don't push down filter when filter not on group by expression") { + val originalQuery = testRelation + .select('a, 'b) + .groupBy('a)('a, Count('b) as 'c) + .where('c === 2L) + + val optimized = Optimize.execute(originalQuery.analyze) + + comparePlans(optimized, originalQuery.analyze) + } + + test("aggregate: push down filters partially which are subset of group by expressions") { + val originalQuery = testRelation + .select('a, 'b) + .groupBy('a)('a, Count('b) as 'c) + .where('c === 2L && 'a === 3) + + val optimized = Optimize.execute(originalQuery.analyze) + + val correctAnswer = testRelation + .select('a, 'b) + .where('a === 3) + .groupBy('a)('a, Count('b) as 'c) + .where('c === 2L) + .analyze + + comparePlans(optimized, correctAnswer) + } } |