diff options
4 files changed, 302 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 55c168d552..b7d8d932ed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -21,8 +21,8 @@ import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -62,6 +62,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
+ OuterJoinElimination,
@@ -932,6 +933,62 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
+ * Elimination of outer joins, if the predicates can restrict the result sets so that
+ * all null-supplying rows are eliminated
+ *
+ * - full outer -> inner if both sides have such predicates
+ * - left outer -> inner if the right side has such predicates
+ * - right outer -> inner if the left side has such predicates
+ * - full outer -> left outer if only the left side has such predicates
+ * - full outer -> right outer if only the right side has such predicates
+ *
+ * This rule should be executed before pushing down the Filter
+ */
+object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {
+ /**
+ * Returns whether the expression returns null or false when all inputs are nulls.
+ */
+ private def canFilterOutNull(e: Expression): Boolean = {
+ if (!e.deterministic) return false
+ val attributes = e.references.toSeq
+ val emptyRow = new GenericInternalRow(attributes.length)
+ val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
+ v == null || v == false
+ }
+ private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
+ val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition)
+ val leftConditions = splitConjunctiveConditions
+ .filter(_.references.subsetOf(join.left.outputSet))
+ val rightConditions = splitConjunctiveConditions
+ .filter(_.references.subsetOf(join.right.outputSet))
+ val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||
+ filter.constraints.filter(_.isInstanceOf[IsNotNull])
+ .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
+ val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) ||
+ filter.constraints.filter(_.isInstanceOf[IsNotNull])
+ .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)
+ join.joinType match {
+ case RightOuter if leftHasNonNullPredicate => Inner
+ case LeftOuter if rightHasNonNullPredicate => Inner
+ case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
+ case FullOuter if leftHasNonNullPredicate => LeftOuter
+ case FullOuter if rightHasNonNullPredicate => RightOuter
+ case o => o
+ }
+ }
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
+ val newJoinType = buildNewJoinType(f, j)
+ if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
+ }
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
* [[Filter]] conditions are moved into the `condition` of the [[Join]].
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
new file mode 100644
index 0000000000..a1dc836a5f
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -0,0 +1,195 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+class OuterJoinEliminationSuite extends PlanTest {
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubQueries) ::
+ Batch("Outer Join Elimination", Once,
+ OuterJoinElimination,
+ PushPredicateThroughJoin) :: Nil
+ }
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+ val testRelation1 = LocalRelation('d.int, 'e.int, 'f.int)
+ test("joins: full outer to inner") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+ .where("x.b".attr >= 1 && "y.d".attr >= 2)
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation.where('b >= 1)
+ val right = testRelation1.where('d >= 2)
+ val correctAnswer =
+ left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
+ test("joins: full outer to right") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("y.d".attr > 2)
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation
+ val right = testRelation1.where('d > 2)
+ val correctAnswer =
+ left.join(right, RightOuter, Option("a".attr === "d".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
+ test("joins: full outer to left") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("x.a".attr <=> 2)
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation.where('a <=> 2)
+ val right = testRelation1
+ val correctAnswer =
+ left.join(right, LeftOuter, Option("a".attr === "d".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
+ test("joins: right to inner") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val originalQuery =
+ x.join(y, RightOuter, Option("x.a".attr === "y.d".attr)).where("x.b".attr > 2)
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation.where('b > 2)
+ val right = testRelation1
+ val correctAnswer =
+ left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
+ test("joins: left to inner") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val originalQuery =
+ x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+ .where("y.e".attr.isNotNull)
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation
+ val right = testRelation1.where('e.isNotNull)
+ val correctAnswer =
+ left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
+ // evaluating if mixed OR and NOT expressions can eliminate all null-supplying rows
+ test("joins: left to inner with complicated filter predicates #1") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val originalQuery =
+ x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+ .where(!'e.isNull || ('d.isNotNull && 'f.isNull))
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation
+ val right = testRelation1.where(!'e.isNull || ('d.isNotNull && 'f.isNull))
+ val correctAnswer =
+ left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
+ // eval(emptyRow) of 'e.in(1, 2) will return null instead of false
+ test("joins: left to inner with complicated filter predicates #2") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val originalQuery =
+ x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+ .where('e.in(1, 2))
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation
+ val right = testRelation1.where('e.in(1, 2))
+ val correctAnswer =
+ left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
+ // evaluating if mixed OR and AND expressions can eliminate all null-supplying rows
+ test("joins: left to inner with complicated filter predicates #3") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val originalQuery =
+ x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+ .where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull)
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation
+ val right = testRelation1.where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull)
+ val correctAnswer =
+ left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
+ // evaluating if the expressions that have both left and right attributes
+ // can eliminate all null-supplying rows
+ test("joins: left to inner with complicated filter predicates #4") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+ .where("x.b".attr + 3 === "y.e".attr)
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation
+ val right = testRelation1
+ val correctAnswer =
+ left.join(right, Inner, Option("b".attr + 3 === "e".attr && "a".attr === "d".attr)).analyze
+ comparePlans(optimized, correctAnswer)
+ }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index a5e5f15642..067a62d011 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution.joins.BroadcastHashJoin
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
@@ -156,4 +158,50 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
assert(df1.join(broadcast(pf1)).count() === 4)
+ test("join - outer join conversion") {
+ val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
+ val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
+ // outer -> left
+ val outerJoin2Left = df.join(df2, $"a.int" === $"b.int", "outer").where($"a.int" === 3)
+ assert(outerJoin2Left.queryExecution.optimizedPlan.collect {
+ case j @ Join(_, _, LeftOuter, _) => j }.size === 1)
+ checkAnswer(
+ outerJoin2Left,
+ Row(3, 4, "3", null, null, null) :: Nil)
+ // outer -> right
+ val outerJoin2Right = df.join(df2, $"a.int" === $"b.int", "outer").where($"b.int" === 5)
+ assert(outerJoin2Right.queryExecution.optimizedPlan.collect {
+ case j @ Join(_, _, RightOuter, _) => j }.size === 1)
+ checkAnswer(
+ outerJoin2Right,
+ Row(null, null, null, 5, 6, "5") :: Nil)
+ // outer -> inner
+ val outerJoin2Inner = df.join(df2, $"a.int" === $"b.int", "outer").
+ where($"a.int" === 1 && $"b.int2" === 3)
+ assert(outerJoin2Inner.queryExecution.optimizedPlan.collect {
+ case j @ Join(_, _, Inner, _) => j }.size === 1)
+ checkAnswer(
+ outerJoin2Inner,
+ Row(1, 2, "1", 1, 3, "1") :: Nil)
+ // right -> inner
+ val rightJoin2Inner = df.join(df2, $"a.int" === $"b.int", "right").where($"a.int" === 1)
+ assert(rightJoin2Inner.queryExecution.optimizedPlan.collect {
+ case j @ Join(_, _, Inner, _) => j }.size === 1)
+ checkAnswer(
+ rightJoin2Inner,
+ Row(1, 2, "1", 1, 3, "1") :: Nil)
+ // left -> inner
+ val leftJoin2Inner = df.join(df2, $"a.int" === $"b.int", "left").where($"b.int2" === 3)
+ assert(leftJoin2Inner.queryExecution.optimizedPlan.collect {
+ case j @ Join(_, _, Inner, _) => j }.size === 1)
+ checkAnswer(
+ leftJoin2Inner,
+ Row(1, 2, "1", 1, 3, "1") :: Nil)
+ }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 92ff7e73fa..8f2a0c0351 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -81,7 +81,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeOuterJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
- classOf[SortMergeOuterJoin]),
+ classOf[SortMergeJoin]), // converted from Right Outer to Inner
("SELECT * FROM testData right join testData2 ON key = a and key = 2",
("SELECT * FROM testData full outer join testData2 ON key = a",