diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-03-15 00:30:14 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-15 00:30:14 -0700 |
commit | 99bd2f0e94657687834c5c59c4270c1484c9f595 (patch) | |
tree | 524dd1a4635dba48b6e83e1cd3ef324e790b05a9 | |
parent | 276c2d51a3bbe2531763a11580adfec7e39fdd58 (diff) | |
download | spark-99bd2f0e94657687834c5c59c4270c1484c9f595.tar.gz spark-99bd2f0e94657687834c5c59c4270c1484c9f595.tar.bz2 spark-99bd2f0e94657687834c5c59c4270c1484c9f595.zip |
[SPARK-13840][SQL] Split Optimizer Rule ColumnPruning to ColumnPruning and EliminateOperator
#### What changes were proposed in this pull request?
Before this PR, two Optimizer rules `ColumnPruning` and `PushPredicateThroughProject` reverse each other's effects. Optimizer always reaches the max iteration when optimizing some queries. Extra `Project` are found in the plan. For example, below is the optimized plan after reaching 100 iterations:
```
Join Inner, Some((cast(id1#16 as bigint) = id1#18L))
:- Project [id1#16]
: +- Filter isnotnull(cast(id1#16 as bigint))
: +- Project [id1#16]
: +- Relation[id1#16,newCol#17] JSON part: struct<>, data: struct<id1:int,newCol:int>
+- Filter isnotnull(id1#18L)
+- Relation[id1#18L] JSON part: struct<>, data: struct<id1:bigint>
```
This PR splits the optimizer rule `ColumnPruning` to `ColumnPruning` and `EliminateOperators`
The issue becomes worse when having another rule `NullFiltering`, which could add extra Filters for `IsNotNull`. We have to be careful when introducing extra `Filter` if the benefit is not large enough. Another PR will be submitted by sameeragarwal to handle this issue.
cc sameeragarwal marmbrus
In addition, `ColumnPruning` should not push `Project` through non-deterministic `Filter`. This could cause wrong results. This will be put in a separate PR.
cc davies cloud-fan yhuai
#### How was this patch tested?
Modified the existing test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #11682 from gatorsmile/viewDuplicateNames.
4 files changed, 26 insertions, 14 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 85776670e5..2de92d06ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -71,6 +71,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { PushPredicateThroughAggregate, LimitPushDown, ColumnPruning, + EliminateOperators, // Operator combine CollapseRepartition, CollapseProject, @@ -315,11 +316,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { * - LeftSemiJoin */ object ColumnPruning extends Rule[LogicalPlan] { - private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = - output1.size == output2.size && - output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) - - def apply(plan: LogicalPlan): LogicalPlan = plan transform { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { // Prunes the unused columns from project list of Project/Aggregate/Expand case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty => p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains))) @@ -380,12 +377,6 @@ object ColumnPruning extends Rule[LogicalPlan] { p.copy(child = w.copy( windowExpressions = w.windowExpressions.filter(p.references.contains))) - // Eliminate no-op Window - case w: Window if w.windowExpressions.isEmpty => w.child - - // Eliminate no-op Projects - case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child - // Can't prune the columns on LeafNode case p @ Project(_, l: LeafNode) => p @@ -410,6 +401,24 @@ object ColumnPruning extends Rule[LogicalPlan] { } /** + * Eliminate no-op Project and Window. + * + * Note: this rule should be executed just after ColumnPruning. + */ +object EliminateOperators extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + // Eliminate no-op Projects + case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child + // Eliminate no-op Window + case w: Window if w.windowExpressions.isEmpty => w.child + } + + private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = + output1.size == output2.size && + output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) +} + +/** * Combines two adjacent [[Project]] operators into one and perform alias substitution, * merging the expressions into one single expression. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index dd7d65ddc9..6187fb9e2f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -35,6 +35,7 @@ class ColumnPruningSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Column pruning", FixedPoint(100), ColumnPruning, + EliminateOperators, CollapseProject) :: Nil } @@ -327,8 +328,8 @@ class ColumnPruningSuite extends PlanTest { val input2 = LocalRelation('c.int, 'd.string, 'e.double) val query = Project('b :: Nil, Union(input1 :: input2 :: Nil)).analyze - val expected = Project('b :: Nil, - Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze + val expected = + Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze comparePlans(Optimize.execute(query), expected) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 87ad81db11..e0e9b6d93e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -28,7 +28,8 @@ class CombiningLimitsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Filter Pushdown", FixedPoint(100), - ColumnPruning) :: + ColumnPruning, + EliminateOperators) :: Batch("Combine Limit", FixedPoint(10), CombineLimits) :: Batch("Constant Folding", FixedPoint(10), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index e2f8146bee..51468fa5ce 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -43,6 +43,7 @@ class JoinOptimizationSuite extends PlanTest { PushPredicateThroughGenerate, PushPredicateThroughAggregate, ColumnPruning, + EliminateOperators, CollapseProject) :: Nil } |