aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-08-25 14:18:58 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-08-25 14:18:58 +0200
commitd2ae6399ee2f0524b88262735adbbcb2035de8fd (patch)
tree2873fa10e507dc123d288e26c6c520932e4be765 /sql/catalyst/src
parent2b0cc4e0dfa4ffb9f21ff4a303015bc9c962d42b (diff)
downloadspark-d2ae6399ee2f0524b88262735adbbcb2035de8fd.tar.gz
spark-d2ae6399ee2f0524b88262735adbbcb2035de8fd.tar.bz2
spark-d2ae6399ee2f0524b88262735adbbcb2035de8fd.zip
[SPARK-16991][SPARK-17099][SPARK-17120][SQL] Fix Outer Join Elimination when Filter's isNotNull Constraints Unable to Filter Out All Null-supplying Rows
### What changes were proposed in this pull request? This PR is to fix an incorrect outer join elimination when filter's `isNotNull` constraints is unable to filter out all null-supplying rows. For example, `isnotnull(coalesce(b#227, c#238))`. Users can hit this error when they try to use `using/natural outer join`, which is converted to a normal outer join with a `coalesce` expression on the `using columns`. For example, ```Scala val a = Seq((1, 2), (2, 3)).toDF("a", "b") val b = Seq((2, 5), (3, 4)).toDF("a", "c") val c = Seq((3, 1)).toDF("a", "d") val ab = a.join(b, Seq("a"), "fullouter") ab.join(c, "a").explain(true) ``` The dataframe `ab` is doing `using full-outer join`, which is converted to a normal outer join with a `coalesce` expression. Constraints inference generates a `Filter` with constraints `isnotnull(coalesce(b#227, c#238))`. Then, it triggers a wrong outer join elimination and generates a wrong result. ``` Project [a#251, b#227, c#237, d#247] +- Join Inner, (a#251 = a#246) :- Project [coalesce(a#226, a#236) AS a#251, b#227, c#237] : +- Join FullOuter, (a#226 = a#236) : :- Project [_1#223 AS a#226, _2#224 AS b#227] : : +- LocalRelation [_1#223, _2#224] : +- Project [_1#233 AS a#236, _2#234 AS c#237] : +- LocalRelation [_1#233, _2#234] +- Project [_1#243 AS a#246, _2#244 AS d#247] +- LocalRelation [_1#243, _2#244] == Optimized Logical Plan == Project [a#251, b#227, c#237, d#247] +- Join Inner, (a#251 = a#246) :- Project [coalesce(a#226, a#236) AS a#251, b#227, c#237] : +- Filter isnotnull(coalesce(a#226, a#236)) : +- Join FullOuter, (a#226 = a#236) : :- LocalRelation [a#226, b#227] : +- LocalRelation [a#236, c#237] +- LocalRelation [a#246, d#247] ``` **A note to the `Committer`**, please also give the credit to dongjoon-hyun who submitted another PR for fixing this issue. https://github.com/apache/spark/pull/14580 ### How was this patch tested? Added test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #14661 from gatorsmile/fixOuterJoinElimination.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala18
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala39
2 files changed, 45 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9a0ff8a9b3..82ad0fb5ee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1343,18 +1343,12 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
}
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
- val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition)
- val leftConditions = splitConjunctiveConditions
- .filter(_.references.subsetOf(join.left.outputSet))
- val rightConditions = splitConjunctiveConditions
- .filter(_.references.subsetOf(join.right.outputSet))
-
- val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||
- filter.constraints.filter(_.isInstanceOf[IsNotNull])
- .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
- val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) ||
- filter.constraints.filter(_.isInstanceOf[IsNotNull])
- .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)
+ val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
+ val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
+ val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
+
+ val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
+ val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
join.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
index 41754adef4..c168a55e40 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Coalesce, IsNotNull}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -192,4 +193,42 @@ class OuterJoinEliminationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+
+ test("joins: no outer join elimination if the filter is not NULL eliminated") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+ .where(Coalesce("y.e".attr :: "x.a".attr :: Nil))
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ val left = testRelation
+ val right = testRelation1
+ val correctAnswer =
+ left.join(right, FullOuter, Option("a".attr === "d".attr))
+ .where(Coalesce("e".attr :: "a".attr :: Nil)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("joins: no outer join elimination if the filter's constraints are not NULL eliminated") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+ .where(IsNotNull(Coalesce("y.e".attr :: "x.a".attr :: Nil)))
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ val left = testRelation
+ val right = testRelation1
+ val correctAnswer =
+ left.join(right, FullOuter, Option("a".attr === "d".attr))
+ .where(IsNotNull(Coalesce("e".attr :: "a".attr :: Nil))).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
}