From ee1c1f3a04dfe80843432e349f01178e47f02443 Mon Sep 17 00:00:00 2001 From: scwf Date: Fri, 16 Jan 2015 14:01:22 -0800 Subject: [SPARK-4937][SQL] Adding optimization to simplify the And, Or condition in spark sql Adding optimization to simplify the And/Or condition in spark sql. There are two kinds of Optimization 1 Numeric condition optimization, such as: a < 3 && a > 5 ---- False a < 1 || a > 0 ---- True a > 3 && a > 5 => a > 5 (a < 2 || b > 5) && a < 2 => a < 2 2 optimizing the some query from a cartesian product into equi-join, such as this sql (one of hive-testbench): ``` select sum(l_extendedprice* (1 - l_discount)) as revenue from lineitem, part where ( p_partkey = l_partkey and p_brand = 'Brand#32' and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') and l_quantity >= 7 and l_quantity <= 7 + 10 and p_size between 1 and 5 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON' ) or ( p_partkey = l_partkey and p_brand = 'Brand#35' and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') and l_quantity >= 15 and l_quantity <= 15 + 10 and p_size between 1 and 10 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON' ) or ( p_partkey = l_partkey and p_brand = 'Brand#24' and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') and l_quantity >= 26 and l_quantity <= 26 + 10 and p_size between 1 and 15 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON' ) ``` It has a repeated expression in Or, so we can optimize it by ``` (a && b) || (a && c) = a && (b || c)``` Before optimization, this sql hang in my locally test, and the physical plan is: ![image](https://cloud.githubusercontent.com/assets/7018048/5539175/31cf38e8-8af9-11e4-95e3-336f9b3da4a4.png) After optimization, this sql run successfully in 20+ seconds, and its physical plan is: ![image](https://cloud.githubusercontent.com/assets/7018048/5539176/39a558e0-8af9-11e4-912b-93de94b20075.png) This PR focus on the second optimization and some simple ones of the first. For complex Numeric condition optimization, I will make a follow up PR. Author: scwf Author: wangfei Closes #3778 from scwf/filter1 and squashes the following commits: 58bcbc2 [scwf] minor format fix 9570211 [scwf] conflicts fix 527e6ce [scwf] minor comment improvements 5c6f134 [scwf] remove numeric optimizations and move to BooleanSimplification 546a82b [wangfei] style fix 825fa69 [wangfei] adding more tests a001e8c [wangfei] revert pom changes 32a595b [scwf] improvement and test fix e99a26c [wangfei] refactory And/Or optimization to make it more readable and clean --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 57 +++++++++----- .../optimizer/BooleanSimplificationSuite.scala | 92 ++++++++++++++++++++++ .../catalyst/optimizer/NormalizeFiltersSuite.scala | 72 ----------------- 3 files changed, 131 insertions(+), 90 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d4a4c35691..f3acb70e03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -294,13 +294,10 @@ object OptimizeIn extends Rule[LogicalPlan] { /** * Simplifies boolean expressions: - * * 1. Simplifies expressions whose answer can be determined without evaluating both sides. * 2. Eliminates / extracts common factors. - * 3. Removes `Not` operator. - * - * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus - * is only safe when evaluations of expressions does not result in side effects. + * 3. Merge same expressions + * 4. Removes `Not` operator. */ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -311,9 +308,26 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { case (l, Literal(true, BooleanType)) => l case (Literal(false, BooleanType), _) => Literal(false) case (_, Literal(false, BooleanType)) => Literal(false) - // a && a && a ... => a - case _ if splitConjunctivePredicates(and).distinct.size == 1 => left - case _ => and + // a && a => a + case (l, r) if l fastEquals r => l + case (_, _) => + val lhsSet = splitDisjunctivePredicates(left).toSet + val rhsSet = splitDisjunctivePredicates(right).toSet + val common = lhsSet.intersect(rhsSet) + val ldiff = lhsSet.diff(common) + val rdiff = rhsSet.diff(common) + if (ldiff.size == 0 || rdiff.size == 0) { + // a && (a || b) + common.reduce(Or) + } else { + // (a || b || c || ...) && (a || b || d || ...) && (a || b || e || ...) ... => + // (a || b) || ((c || ...) && (f || ...) && (e || ...) && ...) + (ldiff.reduceOption(Or) ++ rdiff.reduceOption(Or)) + .reduceOption(And) + .map(_ :: common.toList) + .getOrElse(common.toList) + .reduce(Or) + } } case or @ Or(left, right) => @@ -322,19 +336,26 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { case (_, Literal(true, BooleanType)) => Literal(true) case (Literal(false, BooleanType), r) => r case (l, Literal(false, BooleanType)) => l - // a || a || a ... => a - case _ if splitDisjunctivePredicates(or).distinct.size == 1 => left - // (a && b && c && ...) || (a && b && d && ...) => a && b && (c || d || ...) - case _ => + // a || a => a + case (l, r) if l fastEquals r => l + case (_, _) => val lhsSet = splitConjunctivePredicates(left).toSet val rhsSet = splitConjunctivePredicates(right).toSet val common = lhsSet.intersect(rhsSet) - - (lhsSet.diff(common).reduceOption(And) ++ rhsSet.diff(common).reduceOption(And)) - .reduceOption(Or) - .map(_ :: common.toList) - .getOrElse(common.toList) - .reduce(And) + val ldiff = lhsSet.diff(common) + val rdiff = rhsSet.diff(common) + if ( ldiff.size == 0 || rdiff.size == 0) { + // a || (b && a) + common.reduce(And) + } else { + // (a && b && c && ...) || (a && b && d && ...) || (a && b && e && ...) ... => + // a && b && ((c && ...) || (d && ...) || (e && ...) || ...) + (ldiff.reduceOption(And) ++ rdiff.reduceOption(And)) + .reduceOption(Or) + .map(_ :: common.toList) + .getOrElse(common.toList) + .reduce(And) + } } case not @ Not(exp) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala new file mode 100644 index 0000000000..a0863dad96 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators +import org.apache.spark.sql.catalyst.expressions.{Literal, Expression} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class BooleanSimplificationSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("AnalysisNodes", Once, + EliminateAnalysisOperators) :: + Batch("Constant Folding", FixedPoint(50), + NullPropagation, + ConstantFolding, + BooleanSimplification, + SimplifyFilters) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string) + + def checkCondition(originCondition: Expression, optimizedCondition: Expression): Unit = { + val originQuery = testRelation.where(originCondition).analyze + val optimized = Optimize(originQuery) + val expected = testRelation.where(optimizedCondition).analyze + comparePlans(optimized, expected) + } + + test("a && a => a") { + checkCondition(Literal(1) < 'a && Literal(1) < 'a, Literal(1) < 'a) + checkCondition(Literal(1) < 'a && Literal(1) < 'a && Literal(1) < 'a, Literal(1) < 'a) + } + + test("a || a => a") { + checkCondition(Literal(1) < 'a || Literal(1) < 'a, Literal(1) < 'a) + checkCondition(Literal(1) < 'a || Literal(1) < 'a || Literal(1) < 'a, Literal(1) < 'a) + } + + test("(a && b && c && ...) || (a && b && d && ...) || (a && b && e && ...) ...") { + checkCondition('b > 3 || 'c > 5, 'b > 3 || 'c > 5) + + checkCondition(('a < 2 && 'a > 3 && 'b > 5) || 'a < 2, 'a < 2) + + checkCondition('a < 2 || ('a < 2 && 'a > 3 && 'b > 5), 'a < 2) + + val input = ('a === 'b && 'b > 3 && 'c > 2) || + ('a === 'b && 'c < 1 && 'a === 5) || + ('a === 'b && 'b < 5 && 'a > 1) + + val expected = + (((('b > 3) && ('c > 2)) || + (('c < 1) && ('a === 5))) || + (('b < 5) && ('a > 1))) && ('a === 'b) + checkCondition(input, expected) + + } + + test("(a || b || c || ...) && (a || b || d || ...) && (a || b || e || ...) ...") { + checkCondition('b > 3 && 'c > 5, 'b > 3 && 'c > 5) + + checkCondition(('a < 2 || 'a > 3 || 'b > 5) && 'a < 2, 'a < 2) + + checkCondition('a < 2 && ('a < 2 || 'a > 3 || 'b > 5) , 'a < 2) + + checkCondition(('a < 2 || 'b > 3) && ('a < 2 || 'c > 5), ('b > 3 && 'c > 5) || 'a < 2) + + var input: Expression = ('a === 'b || 'b > 3) && ('a === 'b || 'a > 3) && ('a === 'b || 'a < 5) + var expected: Expression = ('b > 3 && 'a > 3 && 'a < 5) || 'a === 'b + checkCondition(input, expected) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala deleted file mode 100644 index 906300d833..0000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.optimizer - -import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators -import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or} -import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.RuleExecutor - -// For implicit conversions -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.dsl.plans._ - -class NormalizeFiltersSuite extends PlanTest { - object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Seq( - Batch("AnalysisNodes", Once, - EliminateAnalysisOperators), - Batch("NormalizeFilters", FixedPoint(100), - BooleanSimplification, - SimplifyFilters)) - } - - val relation = LocalRelation('a.int, 'b.int, 'c.string) - - def checkExpression(original: Expression, expected: Expression): Unit = { - val actual = Optimize(relation.where(original)).collect { case f: Filter => f.condition }.head - val result = (actual, expected) match { - case (And(l1, r1), And(l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1) - case (Or (l1, r1), Or (l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1) - case (lhs, rhs) => lhs fastEquals rhs - } - - assert(result, s"$actual isn't equivalent to $expected") - } - - test("a && a => a") { - checkExpression('a === 1 && 'a === 1, 'a === 1) - checkExpression('a === 1 && 'a === 1 && 'a === 1, 'a === 1) - } - - test("a || a => a") { - checkExpression('a === 1 || 'a === 1, 'a === 1) - checkExpression('a === 1 || 'a === 1 || 'a === 1, 'a === 1) - } - - test("(a && b) || (a && c) => a && (b || c)") { - checkExpression( - ('a === 1 && 'a < 10) || ('a > 2 && 'a === 1), - ('a === 1) && ('a < 10 || 'a > 2)) - - checkExpression( - ('a < 1 && 'b > 2 && 'c.isNull) || ('a < 1 && 'c === "hello" && 'b > 2), - ('c.isNull || 'c === "hello") && 'a < 1 && 'b > 2) - } -} -- cgit v1.2.3