aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-06-23 15:27:43 -0700
committerXiangrui Meng <meng@databricks.com>2016-06-23 15:27:43 -0700
commit91b1ef28d134313d7b6faaffa1c390f3ca4455d0 (patch)
tree94d427fcf90572135e2b744ba09d9d92e55500e3 /sql
parent738f134bf4bf07bafb17e7066cf1a36e315872c2 (diff)
downloadspark-91b1ef28d134313d7b6faaffa1c390f3ca4455d0.tar.gz
spark-91b1ef28d134313d7b6faaffa1c390f3ca4455d0.tar.bz2
spark-91b1ef28d134313d7b6faaffa1c390f3ca4455d0.zip
[SPARK-16164][SQL] Update `CombineFilters` to try to construct predicates with child predicate first
## What changes were proposed in this pull request? This PR changes `CombineFilters` to compose the final predicate condition by using (`child predicate` AND `parent predicate`) instead of (`parent predicate` AND `child predicate`). This is a best effort approach. Some other optimization rules may destroy this order by reorganizing conjunctive predicates. **Reported Error Scenario** Chris McCubbin reported a bug when he used StringIndexer in an ML pipeline with additional filters. It seems that during filter pushdown, we changed the ordering in the logical plan. ```scala import org.apache.spark.ml.feature._ val df1 = (0 until 3).map(_.toString).toDF val indexer = new StringIndexer() .setInputCol("value") .setOutputCol("idx") .setHandleInvalid("skip") .fit(df1) val df2 = (0 until 5).map(_.toString).toDF val predictions = indexer.transform(df2) predictions.show() // this is okay predictions.where('idx > 2).show() // this will throw an exception ``` Please see the notebook at https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1233855/2159162931615821/588180/latest.html for error messages. ## How was this patch tested? Pass the Jenkins tests (including a new testcase). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13872 from dongjoon-hyun/SPARK-16164.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala18
2 files changed, 19 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6e78ad0e77..2bca31d5f1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1002,7 +1002,7 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
(ExpressionSet(splitConjunctivePredicates(fc)) --
ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match {
case Some(ac) =>
- Filter(And(ac, nc), grandChild)
+ Filter(And(nc, ac), grandChild)
case None =>
nf
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index b8f28e83e7..9cb49e74ad 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -94,6 +94,24 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("SPARK-16164: Filter pushdown should keep the ordering in the logical plan") {
+ val originalQuery =
+ testRelation
+ .where('a === 1)
+ .select('a, 'b)
+ .where('b === 1)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ testRelation
+ .where('a === 1 && 'b === 1)
+ .select('a, 'b)
+ .analyze
+
+ // We can not use comparePlans here because it normalized the plan.
+ assert(optimized == correctAnswer)
+ }
+
test("can't push without rewrite") {
val originalQuery =
testRelation