aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala26
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala136
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala2
5 files changed, 160 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 586bf3d4dd..650b4eef6e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -86,7 +86,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
BooleanSimplification,
SimplifyConditionals,
RemoveDispensableExpressions,
- SimplifyFilters,
+ PruneFilters,
SimplifyCasts,
SimplifyCaseConversionExpressions,
EliminateSerialization) ::
@@ -827,11 +827,12 @@ object CombineFilters extends Rule[LogicalPlan] {
}
/**
- * Removes filters that can be evaluated trivially. This is done either by eliding the filter for
- * cases where it will always evaluate to `true`, or substituting a dummy empty relation when the
- * filter will always evaluate to `false`.
+ * Removes filters that can be evaluated trivially. This can be done through the following ways:
+ * 1) by eliding the filter for cases where it will always evaluate to `true`.
+ * 2) by substituting a dummy empty relation when the filter will always evaluate to `false`.
+ * 3) by eliminating the always-true conditions given the constraints on the child's output.
*/
-object SimplifyFilters extends Rule[LogicalPlan] {
+object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// If the filter condition always evaluate to true, remove the filter.
case Filter(Literal(true, BooleanType), child) => child
@@ -839,6 +840,21 @@ object SimplifyFilters extends Rule[LogicalPlan] {
// replace the input with an empty relation.
case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty)
case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty)
+ // If any deterministic condition is guaranteed to be true given the constraints on the child's
+ // output, remove the condition
+ case f @ Filter(fc, p: LogicalPlan) =>
+ val (prunedPredicates, remainingPredicates) =
+ splitConjunctivePredicates(fc).partition { cond =>
+ cond.deterministic && p.constraints.contains(cond)
+ }
+ if (prunedPredicates.isEmpty) {
+ f
+ } else if (remainingPredicates.isEmpty) {
+ p
+ } else {
+ val newCond = remainingPredicates.reduce(And)
+ Filter(newCond, p)
+ }
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index 3e52441519..da43751b0a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -36,7 +36,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
NullPropagation,
ConstantFolding,
BooleanSimplification,
- SimplifyFilters) :: Nil
+ PruneFilters) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
new file mode 100644
index 0000000000..0ee7cf9209
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class PruneFiltersSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubqueryAliases) ::
+ Batch("Filter Pushdown and Pruning", Once,
+ CombineFilters,
+ PruneFilters,
+ PushPredicateThroughProject,
+ PushPredicateThroughJoin) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+ test("Constraints of isNull + LeftOuter") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+
+ val query = x.where("x.b".attr.isNull).join(y, LeftOuter)
+ val queryWithUselessFilter = query.where("x.b".attr.isNull)
+
+ val optimized = Optimize.execute(queryWithUselessFilter.analyze)
+ val correctAnswer = query.analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("Constraints of unionall") {
+ val tr1 = LocalRelation('a.int, 'b.int, 'c.int)
+ val tr2 = LocalRelation('d.int, 'e.int, 'f.int)
+ val tr3 = LocalRelation('g.int, 'h.int, 'i.int)
+
+ val query =
+ tr1.where('a.attr > 10)
+ .unionAll(tr2.where('d.attr > 10)
+ .unionAll(tr3.where('g.attr > 10)))
+ val queryWithUselessFilter = query.where('a.attr > 10)
+
+ val optimized = Optimize.execute(queryWithUselessFilter.analyze)
+ val correctAnswer = query.analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("Pruning multiple constraints in the same run") {
+ val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1)
+ val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2)
+
+ val query = tr1
+ .where("tr1.a".attr > 10 || "tr1.c".attr < 10)
+ .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr))
+ // different order of "tr2.a" and "tr1.a"
+ val queryWithUselessFilter =
+ query.where(
+ ("tr1.a".attr > 10 || "tr1.c".attr < 10) &&
+ 'd.attr < 100 &&
+ "tr2.a".attr === "tr1.a".attr)
+
+ val optimized = Optimize.execute(queryWithUselessFilter.analyze)
+ val correctAnswer = query.analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("Partial pruning") {
+ val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1)
+ val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2)
+
+ // One of the filter condition does not exist in the constraints of its child
+ // Thus, the filter is not removed
+ val query = tr1
+ .where("tr1.a".attr > 10)
+ .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.d".attr))
+ val queryWithExtraFilters =
+ query.where("tr1.a".attr > 10 && 'd.attr < 100 && "tr1.a".attr === "tr2.a".attr)
+
+ val optimized = Optimize.execute(queryWithExtraFilters.analyze)
+ val correctAnswer = tr1
+ .where("tr1.a".attr > 10)
+ .join(tr2.where('d.attr < 100),
+ Inner,
+ Some("tr1.a".attr === "tr2.a".attr && "tr1.a".attr === "tr2.d".attr)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("No predicate is pruned") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+
+ val query = x.where("x.b".attr.isNull).join(y, LeftOuter)
+ val queryWithExtraFilters = query.where("x.b".attr.isNotNull)
+
+ val optimized = Optimize.execute(queryWithExtraFilters.analyze)
+ val correctAnswer =
+ testRelation.where("b".attr.isNull).where("b".attr.isNotNull)
+ .join(testRelation, LeftOuter).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("Nondeterministic predicate is not pruned") {
+ val originalQuery = testRelation.where(Rand(10) > 5).select('a).where(Rand(10) > 5).analyze
+ val optimized = Optimize.execute(originalQuery)
+ val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze
+ comparePlans(optimized, correctAnswer)
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index 50f3b512d9..b08cdc8a36 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -32,7 +32,7 @@ class SetOperationSuite extends PlanTest {
Batch("Union Pushdown", Once,
CombineUnions,
SetOperationPushDown,
- SimplifyFilters) :: Nil
+ PruneFilters) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 5a5cb5cf03..95afdc789f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -234,7 +234,7 @@ private[sql] object ParquetFilters {
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
// which can be casted to `false` implicitly. Please refer to the `eval` method of these
- // operators and the `SimplifyFilters` rule for details.
+ // operators and the `PruneFilters` rule for details.
// Hyukjin:
// I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]].