aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authornitin goyal <nitin.goyal@guavus.com>2015-10-21 10:45:21 -0700
committerMichael Armbrust <michael@databricks.com>2015-10-21 10:45:21 -0700
commitf62e3260889d67256d335fd0dd38f114ae4e3eca (patch)
treedc094894d19991c5df1489b9c0fd2b14d53e8e5d /sql
parent8e82e59834aefb74d49e45c9c9c926bb53143b4c (diff)
downloadspark-f62e3260889d67256d335fd0dd38f114ae4e3eca.tar.gz
spark-f62e3260889d67256d335fd0dd38f114ae4e3eca.tar.bz2
spark-f62e3260889d67256d335fd0dd38f114ae4e3eca.zip
[SPARK-11179] [SQL] Push filters through aggregate
Push conjunctive predicates though Aggregate operators when their references are a subset of the groupingExpressions. Query plan before optimisation :- Filter ((c#138L = 2) && (a#0 = 3)) Aggregate [a#0], [a#0,count(b#1) AS c#138L] Project [a#0,b#1] LocalRelation [a#0,b#1,c#2] Query plan after optimisation :- Filter (c#138L = 2) Aggregate [a#0], [a#0,count(b#1) AS c#138L] Filter (a#0 = 3) Project [a#0,b#1] LocalRelation [a#0,b#1,c#2] Author: nitin goyal <nitin.goyal@guavus.com> Author: nitin.goyal <nitin.goyal@guavus.com> Closes #9167 from nitin2goyal/master.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala24
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala45
2 files changed, 69 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6557c7005d..0139b9e87c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -46,6 +46,7 @@ object DefaultOptimizer extends Optimizer {
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
+ PushPredicateThroughAggregate,
ColumnPruning,
// Operator combine
ProjectCollapsing,
@@ -675,6 +676,29 @@ object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelp
}
/**
+ * Push [[Filter]] operators through [[Aggregate]] operators. Parts of the predicate that reference
+ * attributes which are subset of group by attribute set of [[Aggregate]] will be pushed beneath,
+ * and the rest should remain above.
+ */
+object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHelper {
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case filter @ Filter(condition,
+ aggregate @ Aggregate(groupingExpressions, aggregateExpressions, grandChild)) =>
+ val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition {
+ conjunct => conjunct.references subsetOf AttributeSet(groupingExpressions)
+ }
+ if (pushDown.nonEmpty) {
+ val pushDownPredicate = pushDown.reduce(And)
+ val withPushdown = aggregate.copy(child = Filter(pushDownPredicate, grandChild))
+ stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown)
+ } else {
+ filter
+ }
+ }
+}
+
+/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
* [[Filter]] conditions are moved into the `condition` of the [[Join]].
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0f1fde2fb0..ed810a1280 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -40,6 +40,7 @@ class FilterPushdownSuite extends PlanTest {
BooleanSimplification,
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
+ PushPredicateThroughAggregate,
ColumnPruning,
ProjectCollapsing) :: Nil
}
@@ -652,4 +653,48 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer.analyze)
}
+
+ test("aggregate: push down filter when filter on group by expression") {
+ val originalQuery = testRelation
+ .groupBy('a)('a, Count('b) as 'c)
+ .select('a, 'c)
+ .where('a === 2)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ val correctAnswer = testRelation
+ .where('a === 2)
+ .groupBy('a)('a, Count('b) as 'c)
+ .analyze
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("aggregate: don't push down filter when filter not on group by expression") {
+ val originalQuery = testRelation
+ .select('a, 'b)
+ .groupBy('a)('a, Count('b) as 'c)
+ .where('c === 2L)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ comparePlans(optimized, originalQuery.analyze)
+ }
+
+ test("aggregate: push down filters partially which are subset of group by expressions") {
+ val originalQuery = testRelation
+ .select('a, 'b)
+ .groupBy('a)('a, Count('b) as 'c)
+ .where('c === 2L && 'a === 3)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ val correctAnswer = testRelation
+ .select('a, 'b)
+ .where('a === 3)
+ .groupBy('a)('a, Count('b) as 'c)
+ .where('c === 2L)
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
}