diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-06-23 15:27:43 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2016-06-23 15:27:43 -0700 |
commit | 91b1ef28d134313d7b6faaffa1c390f3ca4455d0 (patch) | |
tree | 94d427fcf90572135e2b744ba09d9d92e55500e3 /sql | |
parent | 738f134bf4bf07bafb17e7066cf1a36e315872c2 (diff) | |
download | spark-91b1ef28d134313d7b6faaffa1c390f3ca4455d0.tar.gz spark-91b1ef28d134313d7b6faaffa1c390f3ca4455d0.tar.bz2 spark-91b1ef28d134313d7b6faaffa1c390f3ca4455d0.zip |
[SPARK-16164][SQL] Update `CombineFilters` to try to construct predicates with child predicate first
## What changes were proposed in this pull request?
This PR changes `CombineFilters` to compose the final predicate condition by using (`child predicate` AND `parent predicate`) instead of (`parent predicate` AND `child predicate`). This is a best effort approach. Some other optimization rules may destroy this order by reorganizing conjunctive predicates.
**Reported Error Scenario**
Chris McCubbin reported a bug when he used StringIndexer in an ML pipeline with additional filters. It seems that during filter pushdown, we changed the ordering in the logical plan.
```scala
import org.apache.spark.ml.feature._
val df1 = (0 until 3).map(_.toString).toDF
val indexer = new StringIndexer()
.setInputCol("value")
.setOutputCol("idx")
.setHandleInvalid("skip")
.fit(df1)
val df2 = (0 until 5).map(_.toString).toDF
val predictions = indexer.transform(df2)
predictions.show() // this is okay
predictions.where('idx > 2).show() // this will throw an exception
```
Please see the notebook at https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1233855/2159162931615821/588180/latest.html for error messages.
## How was this patch tested?
Pass the Jenkins tests (including a new testcase).
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #13872 from dongjoon-hyun/SPARK-16164.
Diffstat (limited to 'sql')
2 files changed, 19 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6e78ad0e77..2bca31d5f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1002,7 +1002,7 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { (ExpressionSet(splitConjunctivePredicates(fc)) -- ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match { case Some(ac) => - Filter(And(ac, nc), grandChild) + Filter(And(nc, ac), grandChild) case None => nf } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index b8f28e83e7..9cb49e74ad 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -94,6 +94,24 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("SPARK-16164: Filter pushdown should keep the ordering in the logical plan") { + val originalQuery = + testRelation + .where('a === 1) + .select('a, 'b) + .where('b === 1) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where('a === 1 && 'b === 1) + .select('a, 'b) + .analyze + + // We can not use comparePlans here because it normalized the plan. + assert(optimized == correctAnswer) + } + test("can't push without rewrite") { val originalQuery = testRelation |