aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-08-27 00:36:18 -0700
committerReynold Xin <rxin@databricks.com>2016-08-27 00:36:18 -0700
commit718b6bad2d698b76be6906d51da13626e9f3890e (patch)
tree84ee03208aef217ad81673e2c3187c86a22e16d0 /sql
parent5aad4509c15e131948d387157ecf56af1a705e19 (diff)
downloadspark-718b6bad2d698b76be6906d51da13626e9f3890e.tar.gz
spark-718b6bad2d698b76be6906d51da13626e9f3890e.tar.bz2
spark-718b6bad2d698b76be6906d51da13626e9f3890e.zip
[SPARK-17274][SQL] Move join optimizer rules into a separate file
## What changes were proposed in this pull request? As part of breaking Optimizer.scala apart, this patch moves various join rules into a single file. ## How was this patch tested? This should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #14846 from rxin/SPARK-17274.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala106
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala134
2 files changed, 134 insertions, 106 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 17cab18ff8..7617d34261 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -800,112 +800,6 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
}
/**
- * Reorder the joins and push all the conditions into join, so that the bottom ones have at least
- * one condition.
- *
- * The order of joins will not be changed if all of them already have at least one condition.
- */
-object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
-
- /**
- * Join a list of plans together and push down the conditions into them.
- *
- * The joined plan are picked from left to right, prefer those has at least one join condition.
- *
- * @param input a list of LogicalPlans to join.
- * @param conditions a list of condition for join.
- */
- @tailrec
- def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
- assert(input.size >= 2)
- if (input.size == 2) {
- val (joinConditions, others) = conditions.partition(
- e => !SubqueryExpression.hasCorrelatedSubquery(e))
- val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
- if (others.nonEmpty) {
- Filter(others.reduceLeft(And), join)
- } else {
- join
- }
- } else {
- val left :: rest = input.toList
- // find out the first join that have at least one join condition
- val conditionalJoin = rest.find { plan =>
- val refs = left.outputSet ++ plan.outputSet
- conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
- .exists(_.references.subsetOf(refs))
- }
- // pick the next one if no condition left
- val right = conditionalJoin.getOrElse(rest.head)
-
- val joinedRefs = left.outputSet ++ right.outputSet
- val (joinConditions, others) = conditions.partition(
- e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
- val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
-
- // should not have reference to same logical plan
- createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
- }
- }
-
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case j @ ExtractFiltersAndInnerJoins(input, conditions)
- if input.size > 2 && conditions.nonEmpty =>
- createOrderedJoin(input, conditions)
- }
-}
-
-/**
- * Elimination of outer joins, if the predicates can restrict the result sets so that
- * all null-supplying rows are eliminated
- *
- * - full outer -> inner if both sides have such predicates
- * - left outer -> inner if the right side has such predicates
- * - right outer -> inner if the left side has such predicates
- * - full outer -> left outer if only the left side has such predicates
- * - full outer -> right outer if only the right side has such predicates
- *
- * This rule should be executed before pushing down the Filter
- */
-object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
-
- /**
- * Returns whether the expression returns null or false when all inputs are nulls.
- */
- private def canFilterOutNull(e: Expression): Boolean = {
- if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
- val attributes = e.references.toSeq
- val emptyRow = new GenericInternalRow(attributes.length)
- val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
- v == null || v == false
- }
-
- private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
- val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
- val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
- val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
-
- val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
- val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
-
- join.joinType match {
- case RightOuter if leftHasNonNullPredicate => Inner
- case LeftOuter if rightHasNonNullPredicate => Inner
- case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
- case FullOuter if leftHasNonNullPredicate => LeftOuter
- case FullOuter if rightHasNonNullPredicate => RightOuter
- case o => o
- }
- }
-
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
- val newJoinType = buildNewJoinType(f, j)
- if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
- }
-}
-
-/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
* [[Filter]] conditions are moved into the `condition` of the [[Join]].
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
new file mode 100644
index 0000000000..158ad3d91f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+
+/**
+ * Reorder the joins and push all the conditions into join, so that the bottom ones have at least
+ * one condition.
+ *
+ * The order of joins will not be changed if all of them already have at least one condition.
+ */
+object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
+
+ /**
+ * Join a list of plans together and push down the conditions into them.
+ *
+ * The joined plan are picked from left to right, prefer those has at least one join condition.
+ *
+ * @param input a list of LogicalPlans to join.
+ * @param conditions a list of condition for join.
+ */
+ @tailrec
+ def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
+ assert(input.size >= 2)
+ if (input.size == 2) {
+ val (joinConditions, others) = conditions.partition(
+ e => !SubqueryExpression.hasCorrelatedSubquery(e))
+ val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
+ if (others.nonEmpty) {
+ Filter(others.reduceLeft(And), join)
+ } else {
+ join
+ }
+ } else {
+ val left :: rest = input.toList
+ // find out the first join that have at least one join condition
+ val conditionalJoin = rest.find { plan =>
+ val refs = left.outputSet ++ plan.outputSet
+ conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
+ .exists(_.references.subsetOf(refs))
+ }
+ // pick the next one if no condition left
+ val right = conditionalJoin.getOrElse(rest.head)
+
+ val joinedRefs = left.outputSet ++ right.outputSet
+ val (joinConditions, others) = conditions.partition(
+ e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
+ val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
+
+ // should not have reference to same logical plan
+ createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
+ }
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case j @ ExtractFiltersAndInnerJoins(input, conditions)
+ if input.size > 2 && conditions.nonEmpty =>
+ createOrderedJoin(input, conditions)
+ }
+}
+
+
+/**
+ * Elimination of outer joins, if the predicates can restrict the result sets so that
+ * all null-supplying rows are eliminated
+ *
+ * - full outer -> inner if both sides have such predicates
+ * - left outer -> inner if the right side has such predicates
+ * - right outer -> inner if the left side has such predicates
+ * - full outer -> left outer if only the left side has such predicates
+ * - full outer -> right outer if only the right side has such predicates
+ *
+ * This rule should be executed before pushing down the Filter
+ */
+object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
+
+ /**
+ * Returns whether the expression returns null or false when all inputs are nulls.
+ */
+ private def canFilterOutNull(e: Expression): Boolean = {
+ if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
+ val attributes = e.references.toSeq
+ val emptyRow = new GenericInternalRow(attributes.length)
+ val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
+ v == null || v == false
+ }
+
+ private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
+ val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
+ val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
+ val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
+
+ val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
+ val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
+
+ join.joinType match {
+ case RightOuter if leftHasNonNullPredicate => Inner
+ case LeftOuter if rightHasNonNullPredicate => Inner
+ case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
+ case FullOuter if leftHasNonNullPredicate => LeftOuter
+ case FullOuter if rightHasNonNullPredicate => RightOuter
+ case o => o
+ }
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
+ val newJoinType = buildNewJoinType(f, j)
+ if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
+ }
+}