From f858f466862541c3faad76a1fa2391f1c17ec9dd Mon Sep 17 00:00:00 2001 From: Cody Koeninger Date: Thu, 11 Sep 2014 17:49:36 -0700 Subject: SPARK-3462 push down filters and projections into Unions Author: Cody Koeninger Closes #2345 from koeninger/SPARK-3462 and squashes the following commits: 5c8d24d [Cody Koeninger] SPARK-3462 remove now-unused parameter 0788691 [Cody Koeninger] SPARK-3462 add tests, handle compatible schema with different aliases, per marmbrus feedback ef47b3b [Cody Koeninger] SPARK-3462 push down filters and projections into Unions --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 48 +++++++++++++++++ .../catalyst/optimizer/UnionPushdownSuite.scala | 62 ++++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ddd4b3755d..a4133feae8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -40,12 +40,60 @@ object Optimizer extends RuleExecutor[LogicalPlan] { SimplifyCasts, SimplifyCaseConversionExpressions) :: Batch("Filter Pushdown", FixedPoint(100), + UnionPushdown, CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, ColumnPruning) :: Nil } +/** + * Pushes operations to either side of a Union. + */ +object UnionPushdown extends Rule[LogicalPlan] { + + /** + * Maps Attributes from the left side to the corresponding Attribute on the right side. + */ + def buildRewrites(union: Union): AttributeMap[Attribute] = { + assert(union.left.output.size == union.right.output.size) + + AttributeMap(union.left.output.zip(union.right.output)) + } + + /** + * Rewrites an expression so that it can be pushed to the right side of a Union operator. + * This method relies on the fact that the output attributes of a union are always equal + * to the left child's output. + */ + def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]): A = { + val result = e transform { + case a: Attribute => rewrites(a) + } + + // We must promise the compiler that we did not discard the names in the case of project + // expressions. This is safe since the only transformation is from Attribute => Attribute. + result.asInstanceOf[A] + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Push down filter into union + case Filter(condition, u @ Union(left, right)) => + val rewrites = buildRewrites(u) + Union( + Filter(condition, left), + Filter(pushToRight(condition, rewrites), right)) + + // Push down projection into union + case Project(projectList, u @ Union(left, right)) => + val rewrites = buildRewrites(u) + Union( + Project(projectList, left), + Project(projectList.map(pushToRight(_, rewrites)), right)) + } +} + + /** * Attempts to eliminate the reading of unneeded columns from the query plan using the following * transformations: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala new file mode 100644 index 0000000000..dfef87bd91 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class UnionPushdownSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateAnalysisOperators) :: + Batch("Union Pushdown", Once, + UnionPushdown) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int) + val testUnion = Union(testRelation, testRelation2) + + test("union: filter to each side") { + val query = testUnion.where('a === 1) + + val optimized = Optimize(query.analyze) + + val correctAnswer = + Union(testRelation.where('a === 1), testRelation2.where('d === 1)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("union: project to each side") { + val query = testUnion.select('b) + + val optimized = Optimize(query.analyze) + + val correctAnswer = + Union(testRelation.select('b), testRelation2.select('e)).analyze + + comparePlans(optimized, correctAnswer) + } +} -- cgit v1.2.3