From f96997ba244a14c26e85a2475415a762d0c0d0a8 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 16 Mar 2016 16:26:51 -0700 Subject: [SPARK-13871][SQL] Support for inferring filters from data constraints ## What changes were proposed in this pull request? This PR generalizes the `NullFiltering` optimizer rule in catalyst to `InferFiltersFromConstraints` that can automatically infer all relevant filters based on an operator's constraints while making sure of 2 things: (a) no redundant filters are generated, and (b) filters that do not contribute to any further optimizations are not generated. ## How was this patch tested? Extended all tests in `InferFiltersFromConstraintsSuite` (that were initially based on `NullFilteringSuite` to test filter inference in `Filter` and `Join` operators. In particular the 2 tests ( `single inner join with pre-existing filters: filter out values on either side` and `multiple inner joins: filter out values on all sides on equi-join keys` attempts to highlight/test the real potential of this rule for join optimization. Author: Sameer Agarwal Closes #11665 from sameeragarwal/infer-filters. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 56 ++++------ .../InferFiltersFromConstraintsSuite.scala | 123 +++++++++++++++++++++ .../catalyst/optimizer/NullFilteringSuite.scala | 112 ------------------- 3 files changed, 146 insertions(+), 145 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala (limited to 'sql/catalyst') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2de92d06ec..76f50a3dc3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -72,6 +72,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { LimitPushDown, ColumnPruning, EliminateOperators, + InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, @@ -79,7 +80,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { CombineLimits, CombineUnions, // Constant folding and strength reduction - NullFiltering, NullPropagation, OptimizeIn, ConstantFolding, @@ -607,50 +607,40 @@ object NullPropagation extends Rule[LogicalPlan] { } /** - * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness - * by inserting isNotNull filters in the query plan. These filters are currently inserted beneath - * existing Filters and Join operators and are inferred based on their data constraints. + * Generate a list of additional filters from an operator's existing constraint but remove those + * that are either already part of the operator's condition or are part of the operator's child + * constraints. These filters are currently inserted to the existing conditions in the Filter + * operators and on either side of Join operators. * * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and * LeftSemi joins. */ -object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { +object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, child) => - // We generate a list of additional isNotNull filters from the operator's existing constraints - // but remove those that are either already part of the filter condition or are part of the - // operator's child constraints. - val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) -- + val newFilters = filter.constraints -- (child.constraints ++ splitConjunctivePredicates(condition)) - if (newIsNotNullConstraints.nonEmpty) { - Filter(And(newIsNotNullConstraints.reduce(And), condition), child) + if (newFilters.nonEmpty) { + Filter(And(newFilters.reduce(And), condition), child) } else { filter } - case join @ Join(left, right, joinType, condition) => - val leftIsNotNullConstraints = join.constraints - .filter(_.isInstanceOf[IsNotNull]) - .filter(_.references.subsetOf(left.outputSet)) -- left.constraints - val rightIsNotNullConstraints = - join.constraints - .filter(_.isInstanceOf[IsNotNull]) - .filter(_.references.subsetOf(right.outputSet)) -- right.constraints - val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) { - Filter(leftIsNotNullConstraints.reduce(And), left) - } else { - left - } - val newRightChild = if (rightIsNotNullConstraints.nonEmpty) { - Filter(rightIsNotNullConstraints.reduce(And), right) - } else { - right - } - if (newLeftChild != left || newRightChild != right) { - Join(newLeftChild, newRightChild, joinType, condition) - } else { - join + case join @ Join(left, right, joinType, conditionOpt) => + // Only consider constraints that can be pushed down completely to either the left or the + // right child + val constraints = join.constraints.filter { c => + c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)} + // Remove those constraints that are already enforced by either the left or the right child + val additionalConstraints = constraints -- (left.constraints ++ right.constraints) + val newConditionOpt = conditionOpt match { + case Some(condition) => + val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + case None => + additionalConstraints.reduceOption(And) } + if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala new file mode 100644 index 0000000000..e7fdd5a620 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +class InferFiltersFromConstraintsSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) :: + Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) :: + Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("filter: filter out constraints in condition") { + val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze + val correctAnswer = testRelation + .where(IsNotNull('a) && IsNotNull('b) && 'a === 'b && 'a === 1 && 'b === 1).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("single inner join: filter out values on either side on equi-join keys") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, + condition = Some(("x.a".attr === "y.a".attr) && ("x.a".attr === 1) && ("y.c".attr > 5))) + .analyze + val left = x.where(IsNotNull('a) && "x.a".attr === 1) + val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5 && "y.a".attr === 1) + val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("single inner join: filter out nulls on either side on non equal keys") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, + condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5))) + .analyze + val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.b".attr === 1) + val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5) + val correctAnswer = left.join(right, condition = Some("x.a".attr =!= "y.a".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("single inner join with pre-existing filters: filter out values on either side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.where('b > 5).join(y.where('a === 10), + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze + val left = x.where(IsNotNull('a) && 'a === 10 && IsNotNull('b) && 'b > 5) + val right = y.where(IsNotNull('a) && IsNotNull('b) && 'a === 10 && 'b > 5) + val correctAnswer = left.join(right, + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("single outer join: no null filters are generated") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, FullOuter, + condition = Some("x.a".attr === "y.a".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, originalQuery) + } + + test("multiple inner joins: filter out values on all sides on equi-join keys") { + val t1 = testRelation.subquery('t1) + val t2 = testRelation.subquery('t2) + val t3 = testRelation.subquery('t3) + val t4 = testRelation.subquery('t4) + + val originalQuery = t1.where('b > 5) + .join(t2, condition = Some("t1.b".attr === "t2.b".attr)) + .join(t3, condition = Some("t2.b".attr === "t3.b".attr)) + .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze + val correctAnswer = t1.where(IsNotNull('b) && 'b > 5) + .join(t2.where(IsNotNull('b) && 'b > 5), condition = Some("t1.b".attr === "t2.b".attr)) + .join(t3.where(IsNotNull('b) && 'b > 5), condition = Some("t2.b".attr === "t3.b".attr)) + .join(t4.where(IsNotNull('b) && 'b > 5), condition = Some("t3.b".attr === "t4.b".attr)) + .analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("inner join with filter: filter out values on all sides on equi-join keys") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = + x.join(y, Inner, Some("x.a".attr === "y.a".attr)).where("x.a".attr > 5).analyze + val correctAnswer = x.where(IsNotNull('a) && 'a.attr > 5) + .join(y.where(IsNotNull('a) && 'a.attr > 5), Inner, Some("x.a".attr === "y.a".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala deleted file mode 100644 index 142e4ae6e4..0000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.optimizer - -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules._ - -class NullFilteringSuite extends PlanTest { - - object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("NullFiltering", Once, NullFiltering) :: - Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil - } - - val testRelation = LocalRelation('a.int, 'b.int, 'c.int) - - test("filter: filter out nulls in condition") { - val originalQuery = testRelation.where('a === 1).analyze - val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze - val optimized = Optimize.execute(originalQuery) - comparePlans(optimized, correctAnswer) - } - - test("single inner join: filter out nulls on either side on equi-join keys") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - val originalQuery = x.join(y, - condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5))) - .analyze - val left = x.where(IsNotNull('a) && IsNotNull('b)) - val right = y.where(IsNotNull('a) && IsNotNull('c)) - val correctAnswer = left.join(right, - condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5))) - .analyze - val optimized = Optimize.execute(originalQuery) - comparePlans(optimized, correctAnswer) - } - - test("single inner join: filter out nulls on either side on non equal keys") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - val originalQuery = x.join(y, - condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5))) - .analyze - val left = x.where(IsNotNull('a) && IsNotNull('b)) - val right = y.where(IsNotNull('a) && IsNotNull('c)) - val correctAnswer = left.join(right, - condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5))) - .analyze - val optimized = Optimize.execute(originalQuery) - comparePlans(optimized, correctAnswer) - } - - test("single inner join with pre-existing filters: filter out nulls on either side") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - val originalQuery = x.where('b > 5).join(y.where('c === 10), - condition = Some("x.a".attr === "y.a".attr)).analyze - val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5) - val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10) - val correctAnswer = left.join(right, - condition = Some("x.a".attr === "y.a".attr)).analyze - val optimized = Optimize.execute(originalQuery) - comparePlans(optimized, correctAnswer) - } - - test("single outer join: no null filters are generated") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - val originalQuery = x.join(y, FullOuter, - condition = Some("x.a".attr === "y.a".attr)).analyze - val optimized = Optimize.execute(originalQuery) - comparePlans(optimized, originalQuery) - } - - test("multiple inner joins: filter out nulls on all sides on equi-join keys") { - val t1 = testRelation.subquery('t1) - val t2 = testRelation.subquery('t2) - val t3 = testRelation.subquery('t3) - val t4 = testRelation.subquery('t4) - - val originalQuery = t1 - .join(t2, condition = Some("t1.b".attr === "t2.b".attr)) - .join(t3, condition = Some("t2.b".attr === "t3.b".attr)) - .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze - val correctAnswer = t1.where(IsNotNull('b)) - .join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr)) - .join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr)) - .join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze - val optimized = Optimize.execute(originalQuery) - comparePlans(optimized, correctAnswer) - } -} -- cgit v1.2.3