diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-03-16 16:26:51 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-03-16 16:26:51 -0700 |
commit | f96997ba244a14c26e85a2475415a762d0c0d0a8 (patch) | |
tree | 8278b83a155fc09d736375890a043d9a0f8243d7 /sql | |
parent | b90c0206faeb0883fba1c79fe18aa72affb9988e (diff) | |
download | spark-f96997ba244a14c26e85a2475415a762d0c0d0a8.tar.gz spark-f96997ba244a14c26e85a2475415a762d0c0d0a8.tar.bz2 spark-f96997ba244a14c26e85a2475415a762d0c0d0a8.zip |
[SPARK-13871][SQL] Support for inferring filters from data constraints
## What changes were proposed in this pull request?
This PR generalizes the `NullFiltering` optimizer rule in catalyst to `InferFiltersFromConstraints` that can automatically infer all relevant filters based on an operator's constraints while making sure of 2 things:
(a) no redundant filters are generated, and
(b) filters that do not contribute to any further optimizations are not generated.
## How was this patch tested?
Extended all tests in `InferFiltersFromConstraintsSuite` (that were initially based on `NullFilteringSuite` to test filter inference in `Filter` and `Join` operators.
In particular the 2 tests ( `single inner join with pre-existing filters: filter out values on either side` and `multiple inner joins: filter out values on all sides on equi-join keys` attempts to highlight/test the real potential of this rule for join optimization.
Author: Sameer Agarwal <sameer@databricks.com>
Closes #11665 from sameeragarwal/infer-filters.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 56 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala) | 69 |
2 files changed, 63 insertions, 62 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2de92d06ec..76f50a3dc3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -72,6 +72,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { LimitPushDown, ColumnPruning, EliminateOperators, + InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, @@ -79,7 +80,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { CombineLimits, CombineUnions, // Constant folding and strength reduction - NullFiltering, NullPropagation, OptimizeIn, ConstantFolding, @@ -607,50 +607,40 @@ object NullPropagation extends Rule[LogicalPlan] { } /** - * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness - * by inserting isNotNull filters in the query plan. These filters are currently inserted beneath - * existing Filters and Join operators and are inferred based on their data constraints. + * Generate a list of additional filters from an operator's existing constraint but remove those + * that are either already part of the operator's condition or are part of the operator's child + * constraints. These filters are currently inserted to the existing conditions in the Filter + * operators and on either side of Join operators. * * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and * LeftSemi joins. */ -object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { +object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, child) => - // We generate a list of additional isNotNull filters from the operator's existing constraints - // but remove those that are either already part of the filter condition or are part of the - // operator's child constraints. - val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) -- + val newFilters = filter.constraints -- (child.constraints ++ splitConjunctivePredicates(condition)) - if (newIsNotNullConstraints.nonEmpty) { - Filter(And(newIsNotNullConstraints.reduce(And), condition), child) + if (newFilters.nonEmpty) { + Filter(And(newFilters.reduce(And), condition), child) } else { filter } - case join @ Join(left, right, joinType, condition) => - val leftIsNotNullConstraints = join.constraints - .filter(_.isInstanceOf[IsNotNull]) - .filter(_.references.subsetOf(left.outputSet)) -- left.constraints - val rightIsNotNullConstraints = - join.constraints - .filter(_.isInstanceOf[IsNotNull]) - .filter(_.references.subsetOf(right.outputSet)) -- right.constraints - val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) { - Filter(leftIsNotNullConstraints.reduce(And), left) - } else { - left - } - val newRightChild = if (rightIsNotNullConstraints.nonEmpty) { - Filter(rightIsNotNullConstraints.reduce(And), right) - } else { - right - } - if (newLeftChild != left || newRightChild != right) { - Join(newLeftChild, newRightChild, joinType, condition) - } else { - join + case join @ Join(left, right, joinType, conditionOpt) => + // Only consider constraints that can be pushed down completely to either the left or the + // right child + val constraints = join.constraints.filter { c => + c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)} + // Remove those constraints that are already enforced by either the left or the right child + val additionalConstraints = constraints -- (left.constraints ++ right.constraints) + val newConditionOpt = conditionOpt match { + case Some(condition) => + val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + case None => + additionalConstraints.reduceOption(And) } + if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 142e4ae6e4..e7fdd5a620 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -24,33 +24,33 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -class NullFilteringSuite extends PlanTest { +class InferFiltersFromConstraintsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("NullFiltering", Once, NullFiltering) :: + val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) :: + Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) :: Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) - test("filter: filter out nulls in condition") { - val originalQuery = testRelation.where('a === 1).analyze - val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze + test("filter: filter out constraints in condition") { + val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze + val correctAnswer = testRelation + .where(IsNotNull('a) && IsNotNull('b) && 'a === 'b && 'a === 1 && 'b === 1).analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } - test("single inner join: filter out nulls on either side on equi-join keys") { + test("single inner join: filter out values on either side on equi-join keys") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) val originalQuery = x.join(y, - condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5))) - .analyze - val left = x.where(IsNotNull('a) && IsNotNull('b)) - val right = y.where(IsNotNull('a) && IsNotNull('c)) - val correctAnswer = left.join(right, - condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5))) + condition = Some(("x.a".attr === "y.a".attr) && ("x.a".attr === 1) && ("y.c".attr > 5))) .analyze + val left = x.where(IsNotNull('a) && "x.a".attr === 1) + val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5 && "y.a".attr === 1) + val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr)).analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } @@ -61,24 +61,22 @@ class NullFilteringSuite extends PlanTest { val originalQuery = x.join(y, condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5))) .analyze - val left = x.where(IsNotNull('a) && IsNotNull('b)) - val right = y.where(IsNotNull('a) && IsNotNull('c)) - val correctAnswer = left.join(right, - condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5))) - .analyze + val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.b".attr === 1) + val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5) + val correctAnswer = left.join(right, condition = Some("x.a".attr =!= "y.a".attr)).analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } - test("single inner join with pre-existing filters: filter out nulls on either side") { + test("single inner join with pre-existing filters: filter out values on either side") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) - val originalQuery = x.where('b > 5).join(y.where('c === 10), - condition = Some("x.a".attr === "y.a".attr)).analyze - val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5) - val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10) + val originalQuery = x.where('b > 5).join(y.where('a === 10), + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze + val left = x.where(IsNotNull('a) && 'a === 10 && IsNotNull('b) && 'b > 5) + val right = y.where(IsNotNull('a) && IsNotNull('b) && 'a === 10 && 'b > 5) val correctAnswer = left.join(right, - condition = Some("x.a".attr === "y.a".attr)).analyze + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } @@ -92,20 +90,33 @@ class NullFilteringSuite extends PlanTest { comparePlans(optimized, originalQuery) } - test("multiple inner joins: filter out nulls on all sides on equi-join keys") { + test("multiple inner joins: filter out values on all sides on equi-join keys") { val t1 = testRelation.subquery('t1) val t2 = testRelation.subquery('t2) val t3 = testRelation.subquery('t3) val t4 = testRelation.subquery('t4) - val originalQuery = t1 + val originalQuery = t1.where('b > 5) .join(t2, condition = Some("t1.b".attr === "t2.b".attr)) .join(t3, condition = Some("t2.b".attr === "t3.b".attr)) .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze - val correctAnswer = t1.where(IsNotNull('b)) - .join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr)) - .join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr)) - .join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze + val correctAnswer = t1.where(IsNotNull('b) && 'b > 5) + .join(t2.where(IsNotNull('b) && 'b > 5), condition = Some("t1.b".attr === "t2.b".attr)) + .join(t3.where(IsNotNull('b) && 'b > 5), condition = Some("t2.b".attr === "t3.b".attr)) + .join(t4.where(IsNotNull('b) && 'b > 5), condition = Some("t3.b".attr === "t4.b".attr)) + .analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("inner join with filter: filter out values on all sides on equi-join keys") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = + x.join(y, Inner, Some("x.a".attr === "y.a".attr)).where("x.a".attr > 5).analyze + val correctAnswer = x.where(IsNotNull('a) && 'a.attr > 5) + .join(y.where(IsNotNull('a) && 'a.attr > 5), Inner, Some("x.a".attr === "y.a".attr)).analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } |