From c55397652ad1c6d047a8b8eb7fd92a8a1dc66306 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 1 Jul 2016 22:13:56 +0800 Subject: [SPARK-16208][SQL] Add `PropagateEmptyRelation` optimizer ## What changes were proposed in this pull request? This PR adds a new logical optimizer, `PropagateEmptyRelation`, to collapse a logical plans consisting of only empty LocalRelations. **Optimizer Targets** 1. Binary(or Higher)-node Logical Plans - Union with all empty children. - Join with one or two empty children (including Intersect/Except). 2. Unary-node Logical Plans - Project/Filter/Sample/Join/Limit/Repartition with all empty children. - Aggregate with all empty children and without AggregateFunction expressions, COUNT. - Generate with Explode because other UserDefinedGenerators like Hive UDTF returns results. **Sample Query** ```sql WITH t1 AS (SELECT a FROM VALUES 1 t(a)), t2 AS (SELECT b FROM VALUES 1 t(b) WHERE 1=2) SELECT a,b FROM t1, t2 WHERE a=b GROUP BY a,b HAVING a>1 ORDER BY a,b ``` **Before** ```scala scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain == Physical Plan == *Sort [a#0 ASC, b#1 ASC], true, 0 +- Exchange rangepartitioning(a#0 ASC, b#1 ASC, 200) +- *HashAggregate(keys=[a#0, b#1], functions=[]) +- Exchange hashpartitioning(a#0, b#1, 200) +- *HashAggregate(keys=[a#0, b#1], functions=[]) +- *BroadcastHashJoin [a#0], [b#1], Inner, BuildRight :- *Filter (isnotnull(a#0) && (a#0 > 1)) : +- LocalTableScan [a#0] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *Filter (isnotnull(b#1) && (b#1 > 1)) +- LocalTableScan , [b#1] ``` **After** ```scala scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain == Physical Plan == LocalTableScan , [a#0, b#1] ``` ## How was this patch tested? Pass the Jenkins tests (including a new testsuite). Author: Dongjoon Hyun Closes #13906 from dongjoon-hyun/SPARK-16208. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 3 +- .../optimizer/PropagateEmptyRelation.scala | 78 ++++++++++ .../optimizer/PropagateEmptyRelationSuite.scala | 162 +++++++++++++++++++++ 3 files changed, 242 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 842d6bc26f..9ee1735069 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -113,7 +113,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Typed Filter Optimization", fixedPoint, CombineTypedFilters) :: Batch("LocalRelation", fixedPoint, - ConvertToLocalRelation) :: + ConvertToLocalRelation, + PropagateEmptyRelation) :: Batch("OptimizeCodegen", Once, OptimizeCodegen(conf)) :: Batch("RewriteSubquery", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala new file mode 100644 index 0000000000..50076b1a41 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +/** + * Collapse plans consisting empty local relations generated by [[PruneFilters]]. + * 1. Binary(or Higher)-node Logical Plans + * - Union with all empty children. + * - Join with one or two empty children (including Intersect/Except). + * 2. Unary-node Logical Plans + * - Project/Filter/Sample/Join/Limit/Repartition with all empty children. + * - Aggregate with all empty children and without AggregateFunction expressions like COUNT. + * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. + */ +object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { + private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { + case p: LocalRelation => p.data.isEmpty + case _ => false + } + + private def containsAggregateExpression(e: Expression): Boolean = { + e.collectFirst { case _: AggregateFunction => () }.isDefined + } + + private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty) + + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case p: Union if p.children.forall(isEmptyLocalRelation) => + empty(p) + + case p @ Join(_, _, joinType, _) if p.children.exists(isEmptyLocalRelation) => joinType match { + case Inner => empty(p) + // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule. + // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule. + case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p) + case RightOuter if isEmptyLocalRelation(p.right) => empty(p) + case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p) + case _ => p + } + + case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match { + case _: Project => empty(p) + case _: Filter => empty(p) + case _: Sample => empty(p) + case _: Sort => empty(p) + case _: GlobalLimit => empty(p) + case _: LocalLimit => empty(p) + case _: Repartition => empty(p) + case _: RepartitionByExpression => empty(p) + // AggregateExpressions like COUNT(*) return their results like 0. + case Aggregate(_, ae, _) if !ae.exists(containsAggregateExpression) => empty(p) + // Generators like Hive-style UDTF may return their records within `close`. + case Generate(_: Explode, _, _, _, _, _) => empty(p) + case _ => p + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala new file mode 100644 index 0000000000..c549832ef3 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class PropagateEmptyRelationSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("PropagateEmptyRelation", Once, + CombineUnions, + ReplaceDistinctWithAggregate, + ReplaceExceptWithAntiJoin, + ReplaceIntersectWithSemiJoin, + PushDownPredicate, + PruneFilters, + PropagateEmptyRelation) :: Nil + } + + object OptimizeWithoutPropagateEmptyRelation extends RuleExecutor[LogicalPlan] { + val batches = + Batch("OptimizeWithoutPropagateEmptyRelation", Once, + CombineUnions, + ReplaceDistinctWithAggregate, + ReplaceExceptWithAntiJoin, + ReplaceIntersectWithSemiJoin, + PushDownPredicate, + PruneFilters) :: Nil + } + + val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) + val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = Seq(Row(1))) + + test("propagate empty relation through Union") { + val query = testRelation1 + .where(false) + .union(testRelation2.where(false)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("propagate empty relation through Join") { + // Testcases are tuples of (left predicate, right predicate, joinType, correct answer) + // Note that `None` is used to compare with OptimizeWithoutPropagateEmptyRelation. + val testcases = Seq( + (true, true, Inner, None), + (true, true, LeftOuter, None), + (true, true, RightOuter, None), + (true, true, FullOuter, None), + (true, true, LeftAnti, None), + (true, true, LeftSemi, None), + + (true, false, Inner, Some(LocalRelation('a.int, 'b.int))), + (true, false, LeftOuter, None), + (true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))), + (true, false, FullOuter, None), + (true, false, LeftAnti, None), + (true, false, LeftSemi, None), + + (false, true, Inner, Some(LocalRelation('a.int, 'b.int))), + (false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))), + (false, true, RightOuter, None), + (false, true, FullOuter, None), + (false, true, LeftAnti, Some(LocalRelation('a.int))), + (false, true, LeftSemi, Some(LocalRelation('a.int))), + + (false, false, Inner, Some(LocalRelation('a.int, 'b.int))), + (false, false, LeftOuter, Some(LocalRelation('a.int, 'b.int))), + (false, false, RightOuter, Some(LocalRelation('a.int, 'b.int))), + (false, false, FullOuter, Some(LocalRelation('a.int, 'b.int))), + (false, false, LeftAnti, Some(LocalRelation('a.int))), + (false, false, LeftSemi, Some(LocalRelation('a.int))) + ) + + testcases.foreach { case (left, right, jt, answer) => + val query = testRelation1 + .where(left) + .join(testRelation2.where(right), joinType = jt, condition = Some('a.attr == 'b.attr)) + val optimized = Optimize.execute(query.analyze) + val correctAnswer = + answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) + comparePlans(optimized, correctAnswer) + } + } + + test("propagate empty relation through UnaryNode") { + val query = testRelation1 + .where(false) + .select('a) + .groupBy('a)('a) + .where('a > 1) + .orderBy('a.asc) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("don't propagate non-empty local relation") { + val query = testRelation1 + .where(true) + .groupBy('a)('a) + .where('a > 1) + .orderBy('a.asc) + .select('a) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = testRelation1 + .where('a > 1) + .groupBy('a)('a) + .orderBy('a.asc) + .select('a) + + comparePlans(optimized, correctAnswer.analyze) + } + + test("propagate empty relation through Aggregate without aggregate function") { + val query = testRelation1 + .where(false) + .groupBy('a)('a, ('a + 1).as('x)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int, 'x.int).analyze + + comparePlans(optimized, correctAnswer) + } + + test("don't propagate empty relation through Aggregate with aggregate function") { + val query = testRelation1 + .where(false) + .groupBy('a)(count('a)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int).groupBy('a)(count('a)).analyze + + comparePlans(optimized, correctAnswer) + } +} -- cgit v1.2.3