aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-07-01 22:13:56 +0800
committerCheng Lian <lian@databricks.com>2016-07-01 22:13:56 +0800
commitc55397652ad1c6d047a8b8eb7fd92a8a1dc66306 (patch)
treed6c60d3a2b4703ca9f0440b556fae2a6e24e6365 /sql
parent0ad6ce7e54b1d8f5946dde652fa5341d15059158 (diff)
downloadspark-c55397652ad1c6d047a8b8eb7fd92a8a1dc66306.tar.gz
spark-c55397652ad1c6d047a8b8eb7fd92a8a1dc66306.tar.bz2
spark-c55397652ad1c6d047a8b8eb7fd92a8a1dc66306.zip
[SPARK-16208][SQL] Add `PropagateEmptyRelation` optimizer
## What changes were proposed in this pull request? This PR adds a new logical optimizer, `PropagateEmptyRelation`, to collapse a logical plans consisting of only empty LocalRelations. **Optimizer Targets** 1. Binary(or Higher)-node Logical Plans - Union with all empty children. - Join with one or two empty children (including Intersect/Except). 2. Unary-node Logical Plans - Project/Filter/Sample/Join/Limit/Repartition with all empty children. - Aggregate with all empty children and without AggregateFunction expressions, COUNT. - Generate with Explode because other UserDefinedGenerators like Hive UDTF returns results. **Sample Query** ```sql WITH t1 AS (SELECT a FROM VALUES 1 t(a)), t2 AS (SELECT b FROM VALUES 1 t(b) WHERE 1=2) SELECT a,b FROM t1, t2 WHERE a=b GROUP BY a,b HAVING a>1 ORDER BY a,b ``` **Before** ```scala scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain == Physical Plan == *Sort [a#0 ASC, b#1 ASC], true, 0 +- Exchange rangepartitioning(a#0 ASC, b#1 ASC, 200) +- *HashAggregate(keys=[a#0, b#1], functions=[]) +- Exchange hashpartitioning(a#0, b#1, 200) +- *HashAggregate(keys=[a#0, b#1], functions=[]) +- *BroadcastHashJoin [a#0], [b#1], Inner, BuildRight :- *Filter (isnotnull(a#0) && (a#0 > 1)) : +- LocalTableScan [a#0] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *Filter (isnotnull(b#1) && (b#1 > 1)) +- LocalTableScan <empty>, [b#1] ``` **After** ```scala scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain == Physical Plan == LocalTableScan <empty>, [a#0, b#1] ``` ## How was this patch tested? Pass the Jenkins tests (including a new testsuite). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13906 from dongjoon-hyun/SPARK-16208.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala78
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala162
3 files changed, 242 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 842d6bc26f..9ee1735069 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -113,7 +113,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
Batch("Typed Filter Optimization", fixedPoint,
CombineTypedFilters) ::
Batch("LocalRelation", fixedPoint,
- ConvertToLocalRelation) ::
+ ConvertToLocalRelation,
+ PropagateEmptyRelation) ::
Batch("OptimizeCodegen", Once,
OptimizeCodegen(conf)) ::
Batch("RewriteSubquery", Once,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
new file mode 100644
index 0000000000..50076b1a41
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+/**
+ * Collapse plans consisting empty local relations generated by [[PruneFilters]].
+ * 1. Binary(or Higher)-node Logical Plans
+ * - Union with all empty children.
+ * - Join with one or two empty children (including Intersect/Except).
+ * 2. Unary-node Logical Plans
+ * - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
+ * - Aggregate with all empty children and without AggregateFunction expressions like COUNT.
+ * - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
+ */
+object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
+ private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
+ case p: LocalRelation => p.data.isEmpty
+ case _ => false
+ }
+
+ private def containsAggregateExpression(e: Expression): Boolean = {
+ e.collectFirst { case _: AggregateFunction => () }.isDefined
+ }
+
+ private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty)
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case p: Union if p.children.forall(isEmptyLocalRelation) =>
+ empty(p)
+
+ case p @ Join(_, _, joinType, _) if p.children.exists(isEmptyLocalRelation) => joinType match {
+ case Inner => empty(p)
+ // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
+ // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
+ case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p)
+ case RightOuter if isEmptyLocalRelation(p.right) => empty(p)
+ case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p)
+ case _ => p
+ }
+
+ case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match {
+ case _: Project => empty(p)
+ case _: Filter => empty(p)
+ case _: Sample => empty(p)
+ case _: Sort => empty(p)
+ case _: GlobalLimit => empty(p)
+ case _: LocalLimit => empty(p)
+ case _: Repartition => empty(p)
+ case _: RepartitionByExpression => empty(p)
+ // AggregateExpressions like COUNT(*) return their results like 0.
+ case Aggregate(_, ae, _) if !ae.exists(containsAggregateExpression) => empty(p)
+ // Generators like Hive-style UDTF may return their records within `close`.
+ case Generate(_: Explode, _, _, _, _, _) => empty(p)
+ case _ => p
+ }
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
new file mode 100644
index 0000000000..c549832ef3
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class PropagateEmptyRelationSuite extends PlanTest {
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("PropagateEmptyRelation", Once,
+ CombineUnions,
+ ReplaceDistinctWithAggregate,
+ ReplaceExceptWithAntiJoin,
+ ReplaceIntersectWithSemiJoin,
+ PushDownPredicate,
+ PruneFilters,
+ PropagateEmptyRelation) :: Nil
+ }
+
+ object OptimizeWithoutPropagateEmptyRelation extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("OptimizeWithoutPropagateEmptyRelation", Once,
+ CombineUnions,
+ ReplaceDistinctWithAggregate,
+ ReplaceExceptWithAntiJoin,
+ ReplaceIntersectWithSemiJoin,
+ PushDownPredicate,
+ PruneFilters) :: Nil
+ }
+
+ val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
+ val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = Seq(Row(1)))
+
+ test("propagate empty relation through Union") {
+ val query = testRelation1
+ .where(false)
+ .union(testRelation2.where(false))
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = LocalRelation('a.int)
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("propagate empty relation through Join") {
+ // Testcases are tuples of (left predicate, right predicate, joinType, correct answer)
+ // Note that `None` is used to compare with OptimizeWithoutPropagateEmptyRelation.
+ val testcases = Seq(
+ (true, true, Inner, None),
+ (true, true, LeftOuter, None),
+ (true, true, RightOuter, None),
+ (true, true, FullOuter, None),
+ (true, true, LeftAnti, None),
+ (true, true, LeftSemi, None),
+
+ (true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (true, false, LeftOuter, None),
+ (true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
+ (true, false, FullOuter, None),
+ (true, false, LeftAnti, None),
+ (true, false, LeftSemi, None),
+
+ (false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
+ (false, true, RightOuter, None),
+ (false, true, FullOuter, None),
+ (false, true, LeftAnti, Some(LocalRelation('a.int))),
+ (false, true, LeftSemi, Some(LocalRelation('a.int))),
+
+ (false, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, FullOuter, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, LeftAnti, Some(LocalRelation('a.int))),
+ (false, false, LeftSemi, Some(LocalRelation('a.int)))
+ )
+
+ testcases.foreach { case (left, right, jt, answer) =>
+ val query = testRelation1
+ .where(left)
+ .join(testRelation2.where(right), joinType = jt, condition = Some('a.attr == 'b.attr))
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer =
+ answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
+ comparePlans(optimized, correctAnswer)
+ }
+ }
+
+ test("propagate empty relation through UnaryNode") {
+ val query = testRelation1
+ .where(false)
+ .select('a)
+ .groupBy('a)('a)
+ .where('a > 1)
+ .orderBy('a.asc)
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = LocalRelation('a.int)
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("don't propagate non-empty local relation") {
+ val query = testRelation1
+ .where(true)
+ .groupBy('a)('a)
+ .where('a > 1)
+ .orderBy('a.asc)
+ .select('a)
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = testRelation1
+ .where('a > 1)
+ .groupBy('a)('a)
+ .orderBy('a.asc)
+ .select('a)
+
+ comparePlans(optimized, correctAnswer.analyze)
+ }
+
+ test("propagate empty relation through Aggregate without aggregate function") {
+ val query = testRelation1
+ .where(false)
+ .groupBy('a)('a, ('a + 1).as('x))
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = LocalRelation('a.int, 'x.int).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("don't propagate empty relation through Aggregate with aggregate function") {
+ val query = testRelation1
+ .where(false)
+ .groupBy('a)(count('a))
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = LocalRelation('a.int).groupBy('a)(count('a)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+}