diff options
author | wangfei <wangfei1@huawei.com> | 2015-04-30 18:18:54 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-04-30 18:18:54 -0700 |
commit | a0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45 (patch) | |
tree | 6f13e7be0196fb2c6059058a54f9588b422a313c /sql/catalyst | |
parent | 079733817f02c61ef814f5d9c0c8227498ff0058 (diff) | |
download | spark-a0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45.tar.gz spark-a0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45.tar.bz2 spark-a0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45.zip |
[SPARK-7109] [SQL] Push down left side filter for left semi join
Now in spark sql optimizer we only push down right side filter for left semi join, actually we can push down left side filter because left semi join is doing filter on left table essentially.
Author: wangfei <wangfei1@huawei.com>
Author: scwf <wangfei1@huawei.com>
Closes #5677 from scwf/leftsemi and squashes the following commits:
483d205 [wangfei] update with master to fix compile issue
82df0e1 [wangfei] Merge branch 'master' of https://github.com/apache/spark into leftsemi
d68a053 [wangfei] added apply
8f48a3d [scwf] added test
ebadaa9 [wangfei] left filter push down for left semi join
Diffstat (limited to 'sql/catalyst')
2 files changed, 24 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 63669e970b..709f7d672d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -571,7 +571,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { - case Inner => + case _ @ (Inner | LeftSemi) => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) @@ -579,7 +579,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = commonJoinCondition.reduceLeftOption(And) - Join(newLeft, newRight, Inner, newJoinCond) + Join(newLeft, newRight, joinType, newJoinCond) case RightOuter => // push down the left side only join filter for left side sub query val newLeft = leftJoinConditions. @@ -588,14 +588,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, RightOuter, newJoinCond) - case _ @ (LeftOuter | LeftSemi) => + case LeftOuter => // push down the right side only join filter for right sub query val newLeft = left val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) - Join(newLeft, newRight, joinType, newJoinCond) + Join(newLeft, newRight, LeftOuter, newJoinCond) case FullOuter => f } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 2ad73941ab..58d415d901 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.expressions.{Count, Explode} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -43,6 +43,8 @@ class FilterPushdownSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + val testRelation1 = LocalRelation('d.int) + // This test already passes. test("eliminate subqueries") { val originalQuery = @@ -213,6 +215,23 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("joins: push down left semi join") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = { + x.join(y, LeftSemi, Option("x.a".attr === "y.d".attr && "x.b".attr >= 1 && "y.d".attr >= 2)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b >= 1) + val right = testRelation1.where('d >= 2) + val correctAnswer = + left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + test("joins: push down left outer join #1") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) |