From d76592276f9f66fed8012d876595de8717f516a9 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 6 Apr 2016 19:25:10 -0700 Subject: [SPARK-12610][SQL] Left Anti Join ### What changes were proposed in this pull request? This PR adds support for `LEFT ANTI JOIN` to Spark SQL. A `LEFT ANTI JOIN` is the exact opposite of a `LEFT SEMI JOIN` and can be used to identify rows in one dataset that are not in another dataset. Note that `nulls` on the left side of the join cannot match a row on the right hand side of the join; the result is that left anti join will always select a row with a `null` in one or more of its keys. We currently add support for the following SQL join syntax: SELECT * FROM tbl1 A LEFT ANTI JOIN tbl2 B ON A.Id = B.Id Or using a dataframe: tbl1.as("a").join(tbl2.as("b"), $"a.id" === $"b.id", "left_anti) This PR provides serves as the basis for implementing `NOT EXISTS` and `NOT IN (...)` correlated sub-queries. It would also serve as good basis for implementing an more efficient `EXCEPT` operator. The PR has been (losely) based on PR's by both davies (https://github.com/apache/spark/pull/10706) and chenghao-intel (https://github.com/apache/spark/pull/10563); credit should be given where credit is due. This PR adds supports for `LEFT ANTI JOIN` to `BroadcastHashJoin` (including codegeneration), `ShuffledHashJoin` and `BroadcastNestedLoopJoin`. ### How was this patch tested? Added tests to `JoinSuite` and ported `ExistenceJoinSuite` from https://github.com/apache/spark/pull/10563. cc davies chenghao-intel rxin Author: Herman van Hovell Closes #12214 from hvanhovell/SPARK-12610. --- .../org/apache/spark/sql/catalyst/parser/SqlBase.g4 | 2 ++ .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 8 ++++---- .../apache/spark/sql/catalyst/parser/AstBuilder.scala | 1 + .../org/apache/spark/sql/catalyst/plans/joinTypes.scala | 17 +++++++++++++++-- .../sql/catalyst/plans/logical/basicOperators.scala | 4 ++-- .../spark/sql/catalyst/parser/PlanParserSuite.scala | 5 ++++- 7 files changed, 29 insertions(+), 10 deletions(-) (limited to 'sql/catalyst') diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 8a45b4f2e1..85cb585919 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -380,6 +380,7 @@ joinType | LEFT SEMI | RIGHT OUTER? | FULL OUTER? + | LEFT? ANTI ; joinCriteria @@ -878,6 +879,7 @@ INDEX: 'INDEX'; INDEXES: 'INDEXES'; LOCKS: 'LOCKS'; OPTION: 'OPTION'; +ANTI: 'ANTI'; STRING : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 473c91e69e..bc8cf4e78a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1424,7 +1424,7 @@ class Analyzer( val projectList = joinType match { case LeftOuter => leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)) - case LeftSemi => + case LeftExistence(_) => leftKeys ++ lUniqueOutput case RightOuter => rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c085a377ff..f581810c26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -361,8 +361,8 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) - // Eliminate unneeded attributes from right side of a LeftSemiJoin. - case j @ Join(left, right, LeftSemi, condition) => + // Eliminate unneeded attributes from right side of a Left Existence Join. + case j @ Join(left, right, LeftExistence(_), condition) => j.copy(right = prunedChild(right, j.references)) // all the columns will be used to compare, so we can't prune them @@ -1126,7 +1126,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftFilterConditions ++ commonFilterCondition). reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) - case _ @ (LeftOuter | LeftSemi) => + case LeftOuter | LeftExistence(_) => // push down the left side only `where` condition val newLeft = leftFilterConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) @@ -1147,7 +1147,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { - case _ @ (Inner | LeftSemi) => + case Inner | LeftExistence(_) => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5a3aebff09..aa59f3fb2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -572,6 +572,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { case null => Inner case jt if jt.FULL != null => FullOuter case jt if jt.SEMI != null => LeftSemi + case jt if jt.ANTI != null => LeftAnti case jt if jt.LEFT != null => LeftOuter case jt if jt.RIGHT != null => RightOuter case _ => Inner diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala index 9ca4f13dd7..13f57c54a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala @@ -26,13 +26,15 @@ object JoinType { case "leftouter" | "left" => LeftOuter case "rightouter" | "right" => RightOuter case "leftsemi" => LeftSemi + case "leftanti" => LeftAnti case _ => val supported = Seq( "inner", "outer", "full", "fullouter", "leftouter", "left", "rightouter", "right", - "leftsemi") + "leftsemi", + "leftanti") throw new IllegalArgumentException(s"Unsupported join type '$typ'. " + "Supported join types include: " + supported.mkString("'", "', '", "'") + ".") @@ -63,6 +65,10 @@ case object LeftSemi extends JoinType { override def sql: String = "LEFT SEMI" } +case object LeftAnti extends JoinType { + override def sql: String = "LEFT ANTI" +} + case class NaturalJoin(tpe: JoinType) extends JoinType { require(Seq(Inner, LeftOuter, RightOuter, FullOuter).contains(tpe), "Unsupported natural join type " + tpe) @@ -70,7 +76,14 @@ case class NaturalJoin(tpe: JoinType) extends JoinType { } case class UsingJoin(tpe: JoinType, usingColumns: Seq[UnresolvedAttribute]) extends JoinType { - require(Seq(Inner, LeftOuter, LeftSemi, RightOuter, FullOuter).contains(tpe), + require(Seq(Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, LeftAnti).contains(tpe), "Unsupported using join type " + tpe) override def sql: String = "USING " + tpe.sql } + +object LeftExistence { + def unapply(joinType: JoinType): Option[JoinType] = joinType match { + case LeftSemi | LeftAnti => Some(joinType) + case _ => None + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index a18efc90ab..d3353beb09 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -252,7 +252,7 @@ case class Join( override def output: Seq[Attribute] = { joinType match { - case LeftSemi => + case LeftExistence(_) => left.output case LeftOuter => left.output ++ right.output.map(_.withNullability(true)) @@ -276,7 +276,7 @@ case class Join( .union(splitConjunctivePredicates(condition.get).toSet) case Inner => left.constraints.union(right.constraints) - case LeftSemi => + case LeftExistence(_) => left.constraints case LeftOuter => left.constraints diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 262537d9c7..411e2372f2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -334,7 +334,7 @@ class PlanParserSuite extends PlanTest { table("t").join(table("u"), UsingJoin(jt, Seq('a.attr, 'b.attr)), None).select(star())) } val testAll = Seq(testUnconditionalJoin, testConditionalJoin, testNaturalJoin, testUsingJoin) - + val testExistence = Seq(testUnconditionalJoin, testConditionalJoin, testUsingJoin) def test(sql: String, jt: JoinType, tests: Seq[(String, JoinType) => Unit]): Unit = { tests.foreach(_(sql, jt)) } @@ -348,6 +348,9 @@ class PlanParserSuite extends PlanTest { test("right outer join", RightOuter, testAll) test("full join", FullOuter, testAll) test("full outer join", FullOuter, testAll) + test("left semi join", LeftSemi, testExistence) + test("left anti join", LeftAnti, testExistence) + test("anti join", LeftAnti, testExistence) // Test multiple consecutive joins assertEqual( -- cgit v1.2.3