aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2016-11-28 07:10:52 -0800
committerHerman van Hovell <hvanhovell@databricks.com>2016-11-28 07:10:52 -0800
commit38e29824d9a50464daa397c28e89610ed0aed4b6 (patch)
tree91de58c8327d60437eb6ba3ca2064d6f0e388c52 /sql/catalyst
parent9f273c5173c05017c3009faaf3e10f2f70a842d0 (diff)
downloadspark-38e29824d9a50464daa397c28e89610ed0aed4b6.tar.gz
spark-38e29824d9a50464daa397c28e89610ed0aed4b6.tar.bz2
spark-38e29824d9a50464daa397c28e89610ed0aed4b6.zip
[SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT ANTI join
## What changes were proposed in this pull request? We currently push down join conditions of a Left Anti join to both sides of the join. This is similar to Inner, Left Semi and Existence (a specialized left semi) join. The problem is that this changes the semantics of the join; a left anti join filters out rows that matches the join condition. This PR fixes this by only pushing down conditions to the left hand side of the join. This is similar to the behavior of left outer join. ## How was this patch tested? Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file for left anti joins with a regression test. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #16026 from hvanhovell/SPARK-18597.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala33
2 files changed, 36 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2679e026bb..805cad5cb9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
joinType match {
- case _: InnerLike | LeftExistence(_) =>
+ case _: InnerLike | LeftSemi | ExistenceJoin(_) =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -949,14 +949,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
Join(newLeft, newRight, RightOuter, newJoinCond)
- case LeftOuter =>
+ case LeftOuter | LeftAnti =>
// push down the right side only join filter for right sub query
val newLeft = left
val newRight = rightJoinConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
- Join(newLeft, newRight, LeftOuter, newJoinCond)
+ Join(newLeft, newRight, joinType, newJoinCond)
case FullOuter => j
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
case UsingJoin(_, _) => sys.error("Untransformed Using join node")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 019f132d94..3e67282d68 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}
+ test("joins: push down where clause into left anti join") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery =
+ x.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
+ .where("x.a".attr > 10)
+ .analyze
+ val optimized = Optimize.execute(originalQuery)
+ val correctAnswer =
+ x.where("x.a".attr > 10)
+ .join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
+ .analyze
+ comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
+ }
+
+ test("joins: only push down join conditions to the right of a left anti join") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery =
+ x.join(y,
+ LeftAnti,
+ Some("x.b".attr === "y.b".attr && "y.a".attr > 10 && "x.a".attr > 10)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ val correctAnswer =
+ x.join(
+ y.where("y.a".attr > 10),
+ LeftAnti,
+ Some("x.b".attr === "y.b".attr && "x.a".attr > 10))
+ .analyze
+ comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
+ }
+
+
val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
test("generate: predicate referenced no generated column") {