aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2015-04-30 18:18:54 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-30 18:18:54 -0700
commita0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45 (patch)
tree6f13e7be0196fb2c6059058a54f9588b422a313c /sql
parent079733817f02c61ef814f5d9c0c8227498ff0058 (diff)
downloadspark-a0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45.tar.gz
spark-a0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45.tar.bz2
spark-a0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45.zip
[SPARK-7109] [SQL] Push down left side filter for left semi join
Now in spark sql optimizer we only push down right side filter for left semi join, actually we can push down left side filter because left semi join is doing filter on left table essentially. Author: wangfei <wangfei1@huawei.com> Author: scwf <wangfei1@huawei.com> Closes #5677 from scwf/leftsemi and squashes the following commits: 483d205 [wangfei] update with master to fix compile issue 82df0e1 [wangfei] Merge branch 'master' of https://github.com/apache/spark into leftsemi d68a053 [wangfei] added apply 8f48a3d [scwf] added test ebadaa9 [wangfei] left filter push down for left semi join
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala21
2 files changed, 24 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 63669e970b..709f7d672d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -571,7 +571,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
joinType match {
- case Inner =>
+ case _ @ (Inner | LeftSemi) =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -579,7 +579,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = commonJoinCondition.reduceLeftOption(And)
- Join(newLeft, newRight, Inner, newJoinCond)
+ Join(newLeft, newRight, joinType, newJoinCond)
case RightOuter =>
// push down the left side only join filter for left side sub query
val newLeft = leftJoinConditions.
@@ -588,14 +588,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
Join(newLeft, newRight, RightOuter, newJoinCond)
- case _ @ (LeftOuter | LeftSemi) =>
+ case LeftOuter =>
// push down the right side only join filter for right sub query
val newLeft = left
val newRight = rightJoinConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
- Join(newLeft, newRight, joinType, newJoinCond)
+ Join(newLeft, newRight, LeftOuter, newJoinCond)
case FullOuter => f
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 2ad73941ab..58d415d901 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions.{Count, Explode}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -43,6 +43,8 @@ class FilterPushdownSuite extends PlanTest {
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+ val testRelation1 = LocalRelation('d.int)
+
// This test already passes.
test("eliminate subqueries") {
val originalQuery =
@@ -213,6 +215,23 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("joins: push down left semi join") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+
+ val originalQuery = {
+ x.join(y, LeftSemi, Option("x.a".attr === "y.d".attr && "x.b".attr >= 1 && "y.d".attr >= 2))
+ }
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val left = testRelation.where('b >= 1)
+ val right = testRelation1.where('d >= 2)
+ val correctAnswer =
+ left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
test("joins: push down left outer join #1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)