diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-03-16 16:27:46 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-03-16 16:27:46 -0700 |
commit | 77ba3021c12dc63cb7d831f964f901e0474acd96 (patch) | |
tree | b0e96ecae892b36310c86a0071d634921d5876ca | |
parent | f96997ba244a14c26e85a2475415a762d0c0d0a8 (diff) | |
download | spark-77ba3021c12dc63cb7d831f964f901e0474acd96.tar.gz spark-77ba3021c12dc63cb7d831f964f901e0474acd96.tar.bz2 spark-77ba3021c12dc63cb7d831f964f901e0474acd96.zip |
[SPARK-13869][SQL] Remove redundant conditions while combining filters
## What changes were proposed in this pull request?
**[I'll link it to the JIRA once ASF JIRA is back online]**
This PR modifies the existing `CombineFilters` rule to remove redundant conditions while combining individual filter predicates. For instance, queries of the form `table.where('a === 1 && 'b === 1).where('a === 1 && 'c === 1)` will now be optimized to ` table.where('a === 1 && 'b === 1 && 'c === 1)` (instead of ` table.where('a === 1 && 'a === 1 && 'b === 1 && 'c === 1)`)
## How was this patch tested?
Unit test in `FilterPushdownSuite`
Author: Sameer Agarwal <sameer@databricks.com>
Closes #11670 from sameeragarwal/combine-filters.
2 files changed, 26 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 76f50a3dc3..3f57b0758e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -818,12 +818,19 @@ object CombineUnions extends Rule[LogicalPlan] { } /** - * Combines two adjacent [[Filter]] operators into one, merging the - * conditions into one conjunctive predicate. + * Combines two adjacent [[Filter]] operators into one, merging the non-redundant conditions into + * one conjunctive predicate. */ -object CombineFilters extends Rule[LogicalPlan] { +object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild) + case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => + (ExpressionSet(splitConjunctivePredicates(fc)) -- + ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match { + case Some(ac) => + Filter(And(ac, nc), grandChild) + case None => + nf + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index a636d63012..b84ae7c5bb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -81,6 +81,21 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("combine redundant filters") { + val originalQuery = + testRelation + .where('a === 1 && 'b === 1) + .where('a === 1 && 'c === 1) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where('a === 1 && 'b === 1 && 'c === 1) + .analyze + + comparePlans(optimized, correctAnswer) + } + test("can't push without rewrite") { val originalQuery = testRelation |