aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala18
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala39
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/outer-join.sql36
-rw-r--r--sql/core/src/test/resources/sql-tests/results/outer-join.sql.out72
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala8
5 files changed, 161 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9a0ff8a9b3..82ad0fb5ee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1343,18 +1343,12 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
}
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
- val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition)
- val leftConditions = splitConjunctiveConditions
- .filter(_.references.subsetOf(join.left.outputSet))
- val rightConditions = splitConjunctiveConditions
- .filter(_.references.subsetOf(join.right.outputSet))
-
- val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||
- filter.constraints.filter(_.isInstanceOf[IsNotNull])
- .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
- val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) ||
- filter.constraints.filter(_.isInstanceOf[IsNotNull])
- .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)
+ val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
+ val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
+ val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
+
+ val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
+ val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
join.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
index 41754adef4..c168a55e40 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Coalesce, IsNotNull}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -192,4 +193,42 @@ class OuterJoinEliminationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+
+ test("joins: no outer join elimination if the filter is not NULL eliminated") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+ .where(Coalesce("y.e".attr :: "x.a".attr :: Nil))
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ val left = testRelation
+ val right = testRelation1
+ val correctAnswer =
+ left.join(right, FullOuter, Option("a".attr === "d".attr))
+ .where(Coalesce("e".attr :: "a".attr :: Nil)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("joins: no outer join elimination if the filter's constraints are not NULL eliminated") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+
+ val originalQuery =
+ x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+ .where(IsNotNull(Coalesce("y.e".attr :: "x.a".attr :: Nil)))
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ val left = testRelation
+ val right = testRelation1
+ val correctAnswer =
+ left.join(right, FullOuter, Option("a".attr === "d".attr))
+ .where(IsNotNull(Coalesce("e".attr :: "a".attr :: Nil))).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
}
diff --git a/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
new file mode 100644
index 0000000000..f50f1ebad9
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
@@ -0,0 +1,36 @@
+-- SPARK-17099: Incorrect result when HAVING clause is added to group by query
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
+(-234), (145), (367), (975), (298)
+as t1(int_col1);
+
+CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
+(-769, -244), (-800, -409), (940, 86), (-507, 304), (-367, 158)
+as t2(int_col0, int_col1);
+
+SELECT
+ (SUM(COALESCE(t1.int_col1, t2.int_col0))),
+ ((COALESCE(t1.int_col1, t2.int_col0)) * 2)
+FROM t1
+RIGHT JOIN t2
+ ON (t2.int_col0) = (t1.int_col1)
+GROUP BY GREATEST(COALESCE(t2.int_col1, 109), COALESCE(t1.int_col1, -449)),
+ COALESCE(t1.int_col1, t2.int_col0)
+HAVING (SUM(COALESCE(t1.int_col1, t2.int_col0)))
+ > ((COALESCE(t1.int_col1, t2.int_col0)) * 2);
+
+
+-- SPARK-17120: Analyzer incorrectly optimizes plan to empty LocalRelation
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (97) as t1(int_col1);
+
+CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (0) as t2(int_col1);
+
+SELECT *
+FROM (
+SELECT
+ COALESCE(t2.int_col1, t1.int_col1) AS int_col
+ FROM t1
+ LEFT JOIN t2 ON false
+) t where (t.int_col) is not null;
+
+
+
diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
new file mode 100644
index 0000000000..b39fdb0e58
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
@@ -0,0 +1,72 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 6
+
+
+-- !query 0
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
+(-234), (145), (367), (975), (298)
+as t1(int_col1)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
+(-769, -244), (-800, -409), (940, 86), (-507, 304), (-367, 158)
+as t2(int_col0, int_col1)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT
+ (SUM(COALESCE(t1.int_col1, t2.int_col0))),
+ ((COALESCE(t1.int_col1, t2.int_col0)) * 2)
+FROM t1
+RIGHT JOIN t2
+ ON (t2.int_col0) = (t1.int_col1)
+GROUP BY GREATEST(COALESCE(t2.int_col1, 109), COALESCE(t1.int_col1, -449)),
+ COALESCE(t1.int_col1, t2.int_col0)
+HAVING (SUM(COALESCE(t1.int_col1, t2.int_col0)))
+ > ((COALESCE(t1.int_col1, t2.int_col0)) * 2)
+-- !query 2 schema
+struct<sum(coalesce(int_col1, int_col0)):bigint,(coalesce(int_col1, int_col0) * 2):int>
+-- !query 2 output
+-367 -734
+-507 -1014
+-769 -1538
+-800 -1600
+
+
+-- !query 3
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (97) as t1(int_col1)
+-- !query 3 schema
+struct<>
+-- !query 3 output
+
+
+
+-- !query 4
+CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (0) as t2(int_col1)
+-- !query 4 schema
+struct<>
+-- !query 4 output
+
+
+
+-- !query 5
+SELECT *
+FROM (
+SELECT
+ COALESCE(t2.int_col1, t1.int_col1) AS int_col
+ FROM t1
+ LEFT JOIN t2 ON false
+) t where (t.int_col) is not null
+-- !query 5 schema
+struct<int_col:int>
+-- !query 5 output
+97
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 4342c039ae..4abf5e42b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -225,4 +225,12 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
Row(1, null) :: Row(null, 2) :: Nil
)
}
+
+ test("SPARK-16991: Full outer join followed by inner join produces wrong results") {
+ val a = Seq((1, 2), (2, 3)).toDF("a", "b")
+ val b = Seq((2, 5), (3, 4)).toDF("a", "c")
+ val c = Seq((3, 1)).toDF("a", "d")
+ val ab = a.join(b, Seq("a"), "fullouter")
+ checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil)
+ }
}