aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala15
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala15
2 files changed, 26 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 76f50a3dc3..3f57b0758e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -818,12 +818,19 @@ object CombineUnions extends Rule[LogicalPlan] {
}
/**
- * Combines two adjacent [[Filter]] operators into one, merging the
- * conditions into one conjunctive predicate.
+ * Combines two adjacent [[Filter]] operators into one, merging the non-redundant conditions into
+ * one conjunctive predicate.
*/
-object CombineFilters extends Rule[LogicalPlan] {
+object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
+ case ff @ Filter(fc, nf @ Filter(nc, grandChild)) =>
+ (ExpressionSet(splitConjunctivePredicates(fc)) --
+ ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match {
+ case Some(ac) =>
+ Filter(And(ac, nc), grandChild)
+ case None =>
+ nf
+ }
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index a636d63012..b84ae7c5bb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -81,6 +81,21 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("combine redundant filters") {
+ val originalQuery =
+ testRelation
+ .where('a === 1 && 'b === 1)
+ .where('a === 1 && 'c === 1)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ testRelation
+ .where('a === 1 && 'b === 1 && 'c === 1)
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
test("can't push without rewrite") {
val originalQuery =
testRelation