diff options
4 files changed, 302 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 55c168d552..b7d8d932ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -21,8 +21,8 @@ import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -62,6 +62,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { SetOperationPushDown, SamplePushDown, ReorderJoin, + OuterJoinElimination, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -932,6 +933,62 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } /** + * Elimination of outer joins, if the predicates can restrict the result sets so that + * all null-supplying rows are eliminated + * + * - full outer -> inner if both sides have such predicates + * - left outer -> inner if the right side has such predicates + * - right outer -> inner if the left side has such predicates + * - full outer -> left outer if only the left side has such predicates + * - full outer -> right outer if only the right side has such predicates + * + * This rule should be executed before pushing down the Filter + */ +object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper { + + /** + * Returns whether the expression returns null or false when all inputs are nulls. + */ + private def canFilterOutNull(e: Expression): Boolean = { + if (!e.deterministic) return false + val attributes = e.references.toSeq + val emptyRow = new GenericInternalRow(attributes.length) + val v = BindReferences.bindReference(e, attributes).eval(emptyRow) + v == null || v == false + } + + private def buildNewJoinType(filter: Filter, join: Join): JoinType = { + val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition) + val leftConditions = splitConjunctiveConditions + .filter(_.references.subsetOf(join.left.outputSet)) + val rightConditions = splitConjunctiveConditions + .filter(_.references.subsetOf(join.right.outputSet)) + + val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) || + filter.constraints.filter(_.isInstanceOf[IsNotNull]) + .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty) + val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) || + filter.constraints.filter(_.isInstanceOf[IsNotNull]) + .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty) + + join.joinType match { + case RightOuter if leftHasNonNullPredicate => Inner + case LeftOuter if rightHasNonNullPredicate => Inner + case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner + case FullOuter if leftHasNonNullPredicate => LeftOuter + case FullOuter if rightHasNonNullPredicate => RightOuter + case o => o + } + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) => + val newJoinType = buildNewJoinType(f, j) + if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) + } +} + +/** * Pushes down [[Filter]] operators where the `condition` can be * evaluated using only the attributes of the left or right side of a join. Other * [[Filter]] conditions are moved into the `condition` of the [[Join]]. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala new file mode 100644 index 0000000000..a1dc836a5f --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +class OuterJoinEliminationSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubQueries) :: + Batch("Outer Join Elimination", Once, + OuterJoinElimination, + PushPredicateThroughJoin) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + val testRelation1 = LocalRelation('d.int, 'e.int, 'f.int) + + test("joins: full outer to inner") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)) + .where("x.b".attr >= 1 && "y.d".attr >= 2) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b >= 1) + val right = testRelation1.where('d >= 2) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: full outer to right") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("y.d".attr > 2) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where('d > 2) + val correctAnswer = + left.join(right, RightOuter, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: full outer to left") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("x.a".attr <=> 2) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('a <=> 2) + val right = testRelation1 + val correctAnswer = + left.join(right, LeftOuter, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: right to inner") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, RightOuter, Option("x.a".attr === "y.d".attr)).where("x.b".attr > 2) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b > 2) + val right = testRelation1 + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: left to inner") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where("y.e".attr.isNotNull) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where('e.isNotNull) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + // evaluating if mixed OR and NOT expressions can eliminate all null-supplying rows + test("joins: left to inner with complicated filter predicates #1") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where(!'e.isNull || ('d.isNotNull && 'f.isNull)) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where(!'e.isNull || ('d.isNotNull && 'f.isNull)) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + // eval(emptyRow) of 'e.in(1, 2) will return null instead of false + test("joins: left to inner with complicated filter predicates #2") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where('e.in(1, 2)) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where('e.in(1, 2)) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + // evaluating if mixed OR and AND expressions can eliminate all null-supplying rows + test("joins: left to inner with complicated filter predicates #3") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + // evaluating if the expressions that have both left and right attributes + // can eliminate all null-supplying rows + // FULL OUTER => INNER + test("joins: left to inner with complicated filter predicates #4") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)) + .where("x.b".attr + 3 === "y.e".attr) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1 + val correctAnswer = + left.join(right, Inner, Option("b".attr + 3 === "e".attr && "a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index a5e5f15642..067a62d011 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution.joins.BroadcastHashJoin import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -156,4 +158,50 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(df1.join(broadcast(pf1)).count() === 4) } } + + test("join - outer join conversion") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + + // outer -> left + val outerJoin2Left = df.join(df2, $"a.int" === $"b.int", "outer").where($"a.int" === 3) + assert(outerJoin2Left.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, LeftOuter, _) => j }.size === 1) + checkAnswer( + outerJoin2Left, + Row(3, 4, "3", null, null, null) :: Nil) + + // outer -> right + val outerJoin2Right = df.join(df2, $"a.int" === $"b.int", "outer").where($"b.int" === 5) + assert(outerJoin2Right.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, RightOuter, _) => j }.size === 1) + checkAnswer( + outerJoin2Right, + Row(null, null, null, 5, 6, "5") :: Nil) + + // outer -> inner + val outerJoin2Inner = df.join(df2, $"a.int" === $"b.int", "outer"). + where($"a.int" === 1 && $"b.int2" === 3) + assert(outerJoin2Inner.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, Inner, _) => j }.size === 1) + checkAnswer( + outerJoin2Inner, + Row(1, 2, "1", 1, 3, "1") :: Nil) + + // right -> inner + val rightJoin2Inner = df.join(df2, $"a.int" === $"b.int", "right").where($"a.int" === 1) + assert(rightJoin2Inner.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, Inner, _) => j }.size === 1) + checkAnswer( + rightJoin2Inner, + Row(1, 2, "1", 1, 3, "1") :: Nil) + + // left -> inner + val leftJoin2Inner = df.join(df2, $"a.int" === $"b.int", "left").where($"b.int2" === 3) + assert(leftJoin2Inner.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, Inner, _) => j }.size === 1) + checkAnswer( + leftJoin2Inner, + Row(1, 2, "1", 1, 3, "1") :: Nil) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 92ff7e73fa..8f2a0c0351 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -81,7 +81,7 @@ class JoinSuite extends QueryTest with SharedSQLContext { ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]), ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeOuterJoin]), ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2", - classOf[SortMergeOuterJoin]), + classOf[SortMergeJoin]), // converted from Right Outer to Inner ("SELECT * FROM testData right join testData2 ON key = a and key = 2", classOf[SortMergeOuterJoin]), ("SELECT * FROM testData full outer join testData2 ON key = a", |