aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCody Koeninger <cody.koeninger@mediacrossing.com>2014-09-11 17:49:36 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-11 17:49:36 -0700
commitf858f466862541c3faad76a1fa2391f1c17ec9dd (patch)
treee8e70fff8803137fc1b4a22b807b467961840750
parentce59725b8703d18988e495dbaaf86ddde4bdfc5a (diff)
downloadspark-f858f466862541c3faad76a1fa2391f1c17ec9dd.tar.gz
spark-f858f466862541c3faad76a1fa2391f1c17ec9dd.tar.bz2
spark-f858f466862541c3faad76a1fa2391f1c17ec9dd.zip
SPARK-3462 push down filters and projections into Unions
Author: Cody Koeninger <cody.koeninger@mediacrossing.com> Closes #2345 from koeninger/SPARK-3462 and squashes the following commits: 5c8d24d [Cody Koeninger] SPARK-3462 remove now-unused parameter 0788691 [Cody Koeninger] SPARK-3462 add tests, handle compatible schema with different aliases, per marmbrus feedback ef47b3b [Cody Koeninger] SPARK-3462 push down filters and projections into Unions
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala48
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala62
2 files changed, 110 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ddd4b3755d..a4133feae8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -40,6 +40,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
SimplifyCasts,
SimplifyCaseConversionExpressions) ::
Batch("Filter Pushdown", FixedPoint(100),
+ UnionPushdown,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughJoin,
@@ -47,6 +48,53 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
}
/**
+ * Pushes operations to either side of a Union.
+ */
+object UnionPushdown extends Rule[LogicalPlan] {
+
+ /**
+ * Maps Attributes from the left side to the corresponding Attribute on the right side.
+ */
+ def buildRewrites(union: Union): AttributeMap[Attribute] = {
+ assert(union.left.output.size == union.right.output.size)
+
+ AttributeMap(union.left.output.zip(union.right.output))
+ }
+
+ /**
+ * Rewrites an expression so that it can be pushed to the right side of a Union operator.
+ * This method relies on the fact that the output attributes of a union are always equal
+ * to the left child's output.
+ */
+ def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]): A = {
+ val result = e transform {
+ case a: Attribute => rewrites(a)
+ }
+
+ // We must promise the compiler that we did not discard the names in the case of project
+ // expressions. This is safe since the only transformation is from Attribute => Attribute.
+ result.asInstanceOf[A]
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ // Push down filter into union
+ case Filter(condition, u @ Union(left, right)) =>
+ val rewrites = buildRewrites(u)
+ Union(
+ Filter(condition, left),
+ Filter(pushToRight(condition, rewrites), right))
+
+ // Push down projection into union
+ case Project(projectList, u @ Union(left, right)) =>
+ val rewrites = buildRewrites(u)
+ Union(
+ Project(projectList, left),
+ Project(projectList.map(pushToRight(_, rewrites)), right))
+ }
+}
+
+
+/**
* Attempts to eliminate the reading of unneeded columns from the query plan using the following
* transformations:
*
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala
new file mode 100644
index 0000000000..dfef87bd91
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+
+class UnionPushdownSuite extends PlanTest {
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateAnalysisOperators) ::
+ Batch("Union Pushdown", Once,
+ UnionPushdown) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+ val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
+ val testUnion = Union(testRelation, testRelation2)
+
+ test("union: filter to each side") {
+ val query = testUnion.where('a === 1)
+
+ val optimized = Optimize(query.analyze)
+
+ val correctAnswer =
+ Union(testRelation.where('a === 1), testRelation2.where('d === 1)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("union: project to each side") {
+ val query = testUnion.select('b)
+
+ val optimized = Optimize(query.analyze)
+
+ val correctAnswer =
+ Union(testRelation.select('b), testRelation2.select('e)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+}