diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-07-01 22:13:56 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-07-01 22:13:56 +0800 |
commit | c55397652ad1c6d047a8b8eb7fd92a8a1dc66306 (patch) | |
tree | d6c60d3a2b4703ca9f0440b556fae2a6e24e6365 /sql/catalyst | |
parent | 0ad6ce7e54b1d8f5946dde652fa5341d15059158 (diff) | |
download | spark-c55397652ad1c6d047a8b8eb7fd92a8a1dc66306.tar.gz spark-c55397652ad1c6d047a8b8eb7fd92a8a1dc66306.tar.bz2 spark-c55397652ad1c6d047a8b8eb7fd92a8a1dc66306.zip |
[SPARK-16208][SQL] Add `PropagateEmptyRelation` optimizer
## What changes were proposed in this pull request?
This PR adds a new logical optimizer, `PropagateEmptyRelation`, to collapse a logical plans consisting of only empty LocalRelations.
**Optimizer Targets**
1. Binary(or Higher)-node Logical Plans
- Union with all empty children.
- Join with one or two empty children (including Intersect/Except).
2. Unary-node Logical Plans
- Project/Filter/Sample/Join/Limit/Repartition with all empty children.
- Aggregate with all empty children and without AggregateFunction expressions, COUNT.
- Generate with Explode because other UserDefinedGenerators like Hive UDTF returns results.
**Sample Query**
```sql
WITH t1 AS (SELECT a FROM VALUES 1 t(a)),
t2 AS (SELECT b FROM VALUES 1 t(b) WHERE 1=2)
SELECT a,b
FROM t1, t2
WHERE a=b
GROUP BY a,b
HAVING a>1
ORDER BY a,b
```
**Before**
```scala
scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain
== Physical Plan ==
*Sort [a#0 ASC, b#1 ASC], true, 0
+- Exchange rangepartitioning(a#0 ASC, b#1 ASC, 200)
+- *HashAggregate(keys=[a#0, b#1], functions=[])
+- Exchange hashpartitioning(a#0, b#1, 200)
+- *HashAggregate(keys=[a#0, b#1], functions=[])
+- *BroadcastHashJoin [a#0], [b#1], Inner, BuildRight
:- *Filter (isnotnull(a#0) && (a#0 > 1))
: +- LocalTableScan [a#0]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *Filter (isnotnull(b#1) && (b#1 > 1))
+- LocalTableScan <empty>, [b#1]
```
**After**
```scala
scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain
== Physical Plan ==
LocalTableScan <empty>, [a#0, b#1]
```
## How was this patch tested?
Pass the Jenkins tests (including a new testsuite).
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #13906 from dongjoon-hyun/SPARK-16208.
Diffstat (limited to 'sql/catalyst')
3 files changed, 242 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 842d6bc26f..9ee1735069 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -113,7 +113,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Typed Filter Optimization", fixedPoint, CombineTypedFilters) :: Batch("LocalRelation", fixedPoint, - ConvertToLocalRelation) :: + ConvertToLocalRelation, + PropagateEmptyRelation) :: Batch("OptimizeCodegen", Once, OptimizeCodegen(conf)) :: Batch("RewriteSubquery", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala new file mode 100644 index 0000000000..50076b1a41 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +/** + * Collapse plans consisting empty local relations generated by [[PruneFilters]]. + * 1. Binary(or Higher)-node Logical Plans + * - Union with all empty children. + * - Join with one or two empty children (including Intersect/Except). + * 2. Unary-node Logical Plans + * - Project/Filter/Sample/Join/Limit/Repartition with all empty children. + * - Aggregate with all empty children and without AggregateFunction expressions like COUNT. + * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. + */ +object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { + private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { + case p: LocalRelation => p.data.isEmpty + case _ => false + } + + private def containsAggregateExpression(e: Expression): Boolean = { + e.collectFirst { case _: AggregateFunction => () }.isDefined + } + + private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty) + + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case p: Union if p.children.forall(isEmptyLocalRelation) => + empty(p) + + case p @ Join(_, _, joinType, _) if p.children.exists(isEmptyLocalRelation) => joinType match { + case Inner => empty(p) + // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule. + // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule. + case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p) + case RightOuter if isEmptyLocalRelation(p.right) => empty(p) + case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p) + case _ => p + } + + case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match { + case _: Project => empty(p) + case _: Filter => empty(p) + case _: Sample => empty(p) + case _: Sort => empty(p) + case _: GlobalLimit => empty(p) + case _: LocalLimit => empty(p) + case _: Repartition => empty(p) + case _: RepartitionByExpression => empty(p) + // AggregateExpressions like COUNT(*) return their results like 0. + case Aggregate(_, ae, _) if !ae.exists(containsAggregateExpression) => empty(p) + // Generators like Hive-style UDTF may return their records within `close`. + case Generate(_: Explode, _, _, _, _, _) => empty(p) + case _ => p + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala new file mode 100644 index 0000000000..c549832ef3 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class PropagateEmptyRelationSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("PropagateEmptyRelation", Once, + CombineUnions, + ReplaceDistinctWithAggregate, + ReplaceExceptWithAntiJoin, + ReplaceIntersectWithSemiJoin, + PushDownPredicate, + PruneFilters, + PropagateEmptyRelation) :: Nil + } + + object OptimizeWithoutPropagateEmptyRelation extends RuleExecutor[LogicalPlan] { + val batches = + Batch("OptimizeWithoutPropagateEmptyRelation", Once, + CombineUnions, + ReplaceDistinctWithAggregate, + ReplaceExceptWithAntiJoin, + ReplaceIntersectWithSemiJoin, + PushDownPredicate, + PruneFilters) :: Nil + } + + val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) + val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = Seq(Row(1))) + + test("propagate empty relation through Union") { + val query = testRelation1 + .where(false) + .union(testRelation2.where(false)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("propagate empty relation through Join") { + // Testcases are tuples of (left predicate, right predicate, joinType, correct answer) + // Note that `None` is used to compare with OptimizeWithoutPropagateEmptyRelation. + val testcases = Seq( + (true, true, Inner, None), + (true, true, LeftOuter, None), + (true, true, RightOuter, None), + (true, true, FullOuter, None), + (true, true, LeftAnti, None), + (true, true, LeftSemi, None), + + (true, false, Inner, Some(LocalRelation('a.int, 'b.int))), + (true, false, LeftOuter, None), + (true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))), + (true, false, FullOuter, None), + (true, false, LeftAnti, None), + (true, false, LeftSemi, None), + + (false, true, Inner, Some(LocalRelation('a.int, 'b.int))), + (false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))), + (false, true, RightOuter, None), + (false, true, FullOuter, None), + (false, true, LeftAnti, Some(LocalRelation('a.int))), + (false, true, LeftSemi, Some(LocalRelation('a.int))), + + (false, false, Inner, Some(LocalRelation('a.int, 'b.int))), + (false, false, LeftOuter, Some(LocalRelation('a.int, 'b.int))), + (false, false, RightOuter, Some(LocalRelation('a.int, 'b.int))), + (false, false, FullOuter, Some(LocalRelation('a.int, 'b.int))), + (false, false, LeftAnti, Some(LocalRelation('a.int))), + (false, false, LeftSemi, Some(LocalRelation('a.int))) + ) + + testcases.foreach { case (left, right, jt, answer) => + val query = testRelation1 + .where(left) + .join(testRelation2.where(right), joinType = jt, condition = Some('a.attr == 'b.attr)) + val optimized = Optimize.execute(query.analyze) + val correctAnswer = + answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) + comparePlans(optimized, correctAnswer) + } + } + + test("propagate empty relation through UnaryNode") { + val query = testRelation1 + .where(false) + .select('a) + .groupBy('a)('a) + .where('a > 1) + .orderBy('a.asc) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("don't propagate non-empty local relation") { + val query = testRelation1 + .where(true) + .groupBy('a)('a) + .where('a > 1) + .orderBy('a.asc) + .select('a) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = testRelation1 + .where('a > 1) + .groupBy('a)('a) + .orderBy('a.asc) + .select('a) + + comparePlans(optimized, correctAnswer.analyze) + } + + test("propagate empty relation through Aggregate without aggregate function") { + val query = testRelation1 + .where(false) + .groupBy('a)('a, ('a + 1).as('x)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int, 'x.int).analyze + + comparePlans(optimized, correctAnswer) + } + + test("don't propagate empty relation through Aggregate with aggregate function") { + val query = testRelation1 + .where(false) + .groupBy('a)(count('a)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int).groupBy('a)(count('a)).analyze + + comparePlans(optimized, correctAnswer) + } +} |