diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-06-02 09:48:58 -0700 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-06-02 09:48:58 -0700 |
commit | 63b7f127caf2fdf96eeb8457afd6c96bc8309a58 (patch) | |
tree | 117bc8d83080df4dec464edbfdbc9664ef158e25 /sql/catalyst | |
parent | 252417fa21eb47781addfd614ff00dac793b52a9 (diff) | |
download | spark-63b7f127caf2fdf96eeb8457afd6c96bc8309a58.tar.gz spark-63b7f127caf2fdf96eeb8457afd6c96bc8309a58.tar.bz2 spark-63b7f127caf2fdf96eeb8457afd6c96bc8309a58.zip |
[SPARK-15076][SQL] Add ReorderAssociativeOperator optimizer
## What changes were proposed in this pull request?
This issue add a new optimizer `ReorderAssociativeOperator` by taking advantage of integral associative property. Currently, Spark works like the following.
1) Can optimize `1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + a` into `45 + a`.
2) Cannot optimize `a + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9`.
This PR can handle Case 2 for **Add/Multiply** expression whose data types are `ByteType`, `ShortType`, `IntegerType`, and `LongType`. The followings are the plan comparison between `before` and `after` this issue.
**Before**
```scala
scala> sql("select a+1+2+3+4+5+6+7+8+9 from (select explode(array(1)) a)").explain
== Physical Plan ==
WholeStageCodegen
: +- Project [(((((((((a#7 + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9) AS (((((((((a + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)#8]
: +- INPUT
+- Generate explode([1]), false, false, [a#7]
+- Scan OneRowRelation[]
scala> sql("select a*1*2*3*4*5*6*7*8*9 from (select explode(array(1)) a)").explain
== Physical Plan ==
*Project [(((((((((a#18 * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9) AS (((((((((a * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9)#19]
+- Generate explode([1]), false, false, [a#18]
+- Scan OneRowRelation[]
```
**After**
```scala
scala> sql("select a+1+2+3+4+5+6+7+8+9 from (select explode(array(1)) a)").explain
== Physical Plan ==
WholeStageCodegen
: +- Project [(a#7 + 45) AS (((((((((a + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)#8]
: +- INPUT
+- Generate explode([1]), false, false, [a#7]
+- Scan OneRowRelation[]
scala> sql("select a*1*2*3*4*5*6*7*8*9 from (select explode(array(1)) a)").explain
== Physical Plan ==
*Project [(a#18 * 362880) AS (((((((((a * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9)#19]
+- Generate explode([1]), false, false, [a#18]
+- Scan OneRowRelation[]
```
This PR is greatly generalized by cloud-fan 's key ideas; he should be credited for the work he did.
## How was this patch tested?
Pass the Jenkins tests including new testsuite.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #12850 from dongjoon-hyun/SPARK-15076.
Diffstat (limited to 'sql/catalyst')
2 files changed, 102 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 93762ad1b9..11cd84b396 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -94,6 +94,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) FoldablePropagation, OptimizeIn(conf), ConstantFolding, + ReorderAssociativeOperator, LikeSimplification, BooleanSimplification, SimplifyConditionals, @@ -738,6 +739,44 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe } /** + * Reorder associative integral-type operators and fold all constants into one. + */ +object ReorderAssociativeOperator extends Rule[LogicalPlan] { + private def flattenAdd(e: Expression): Seq[Expression] = e match { + case Add(l, r) => flattenAdd(l) ++ flattenAdd(r) + case other => other :: Nil + } + + private def flattenMultiply(e: Expression): Seq[Expression] = e match { + case Multiply(l, r) => flattenMultiply(l) ++ flattenMultiply(r) + case other => other :: Nil + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case q: LogicalPlan => q transformExpressionsDown { + case a: Add if a.deterministic && a.dataType.isInstanceOf[IntegralType] => + val (foldables, others) = flattenAdd(a).partition(_.foldable) + if (foldables.size > 1) { + val foldableExpr = foldables.reduce((x, y) => Add(x, y)) + val c = Literal.create(foldableExpr.eval(EmptyRow), a.dataType) + if (others.isEmpty) c else Add(others.reduce((x, y) => Add(x, y)), c) + } else { + a + } + case m: Multiply if m.deterministic && m.dataType.isInstanceOf[IntegralType] => + val (foldables, others) = flattenMultiply(m).partition(_.foldable) + if (foldables.size > 1) { + val foldableExpr = foldables.reduce((x, y) => Multiply(x, y)) + val c = Literal.create(foldableExpr.eval(EmptyRow), m.dataType) + if (others.isEmpty) c else Multiply(others.reduce((x, y) => Multiply(x, y)), c) + } else { + m + } + } + } +} + +/** * Replaces [[Expression Expressions]] that can be statically evaluated with * equivalent [[Literal]] values. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala new file mode 100644 index 0000000000..05e15e9ec4 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class ReorderAssociativeOperatorSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("ReorderAssociativeOperator", Once, + ReorderAssociativeOperator) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("Reorder associative operators") { + val originalQuery = + testRelation + .select( + (Literal(3) + ((Literal(1) + 'a) + 2)) + 4, + 'b * 1 * 2 * 3 * 4, + ('b + 1) * 2 * 3 * 4, + 'a + 1 + 'b + 2 + 'c + 3, + 'a + 1 + 'b * 2 + 'c + 3, + Rand(0) * 1 * 2 * 3 * 4) + + val optimized = Optimize.execute(originalQuery.analyze) + + val correctAnswer = + testRelation + .select( + ('a + 10).as("((3 + ((1 + a) + 2)) + 4)"), + ('b * 24).as("((((b * 1) * 2) * 3) * 4)"), + (('b + 1) * 24).as("((((b + 1) * 2) * 3) * 4)"), + ('a + 'b + 'c + 6).as("(((((a + 1) + b) + 2) + c) + 3)"), + ('a + 'b * 2 + 'c + 4).as("((((a + 1) + (b * 2)) + c) + 3)"), + Rand(0) * 1 * 2 * 3 * 4) + .analyze + + comparePlans(optimized, correctAnswer) + } +} |