aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorStan Zhai <zhaishidan@haizhi.com>2017-03-01 07:52:35 -0800
committerXiao Li <gatorsmile@gmail.com>2017-03-01 07:52:35 -0800
commit5502a9cf883b2058209904c152e5d2c2a106b072 (patch)
treed23f88fb04419a6c08e41f9b3531b62f0f9b3a0c /sql/catalyst
parent38e7835347a2e1803b1df5e73cf8b749951b11b2 (diff)
downloadspark-5502a9cf883b2058209904c152e5d2c2a106b072.tar.gz
spark-5502a9cf883b2058209904c152e5d2c2a106b072.tar.bz2
spark-5502a9cf883b2058209904c152e5d2c2a106b072.zip
[SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule
## What changes were proposed in this pull request? This PR fixes the code in Optimizer phase where the constant alias columns of a `INNER JOIN` query are folded in Rule `FoldablePropagation`. For the following query(): ``` val sqlA = """ |create temporary view ta as |select a, 'a' as tag from t1 union all |select a, 'b' as tag from t2 """.stripMargin val sqlB = """ |create temporary view tb as |select a, 'a' as tag from t3 union all |select a, 'b' as tag from t4 """.stripMargin val sql = """ |select tb.* from ta inner join tb on |ta.a = tb.a and |ta.tag = tb.tag """.stripMargin ``` The tag column is an constant alias column, it's folded by `FoldablePropagation` like this: ``` TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation === Project [a#4, tag#14] Project [a#4, tag#14] !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14)) +- Join Inner, ((a#0 = a#4) && (a = a)) :- Union :- Union : :- Project [a#0, a AS tag#8] : :- Project [a#0, a AS tag#8] : : +- LocalRelation [a#0] : : +- LocalRelation [a#0] : +- Project [a#2, b AS tag#9] : +- Project [a#2, b AS tag#9] : +- LocalRelation [a#2] : +- LocalRelation [a#2] +- Union +- Union :- Project [a#4, a AS tag#14] :- Project [a#4, a AS tag#14] : +- LocalRelation [a#4] : +- LocalRelation [a#4] +- Project [a#6, b AS tag#15] +- Project [a#6, b AS tag#15] +- LocalRelation [a#6] +- LocalRelation [a#6] ``` Finally the Result of Batch Operator Optimizations is: ``` Project [a#4, tag#14] Project [a#4, tag#14] !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14)) +- Join Inner, (a#0 = a#4) ! :- SubqueryAlias ta, `ta` :- Union ! : +- Union : :- LocalRelation [a#0] ! : :- Project [a#0, a AS tag#8] : +- LocalRelation [a#2] ! : : +- SubqueryAlias t1, `t1` +- Union ! : : +- Project [a#0] :- LocalRelation [a#4, tag#14] ! : : +- SubqueryAlias grouping +- LocalRelation [a#6, tag#15] ! : : +- LocalRelation [a#0] ! : +- Project [a#2, b AS tag#9] ! : +- SubqueryAlias t2, `t2` ! : +- Project [a#2] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#2] ! +- SubqueryAlias tb, `tb` ! +- Union ! :- Project [a#4, a AS tag#14] ! : +- SubqueryAlias t3, `t3` ! : +- Project [a#4] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#4] ! +- Project [a#6, b AS tag#15] ! +- SubqueryAlias t4, `t4` ! +- Project [a#6] ! +- SubqueryAlias grouping ! +- LocalRelation [a#6] ``` The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to the data of inner join being wrong. After fix: ``` === Result of Batch LocalRelation === GlobalLimit 21 GlobalLimit 21 +- LocalLimit 21 +- LocalLimit 21 +- Project [a#4, tag#11] +- Project [a#4, tag#11] +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) ! :- SubqueryAlias ta :- Union ! : +- Union : :- LocalRelation [a#0, tag#8] ! : :- Project [a#0, a AS tag#8] : +- LocalRelation [a#2, tag#9] ! : : +- SubqueryAlias t1 +- Union ! : : +- Project [a#0] :- LocalRelation [a#4, tag#11] ! : : +- SubqueryAlias grouping +- LocalRelation [a#6, tag#12] ! : : +- LocalRelation [a#0] ! : +- Project [a#2, b AS tag#9] ! : +- SubqueryAlias t2 ! : +- Project [a#2] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#2] ! +- SubqueryAlias tb ! +- Union ! :- Project [a#4, a AS tag#11] ! : +- SubqueryAlias t3 ! : +- Project [a#4] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#4] ! +- Project [a#6, b AS tag#12] ! +- SubqueryAlias t4 ! +- Project [a#6] ! +- SubqueryAlias grouping ! +- LocalRelation [a#6] ``` ## How was this patch tested? add sql-tests/inputs/inner-join.sql All tests passed. Author: Stan Zhai <zhaishidan@haizhi.com> Closes #17099 from stanzhai/fix-inner-join.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala14
2 files changed, 15 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 4f593c894a..21d1cd5932 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -457,7 +457,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
// join is not always picked from its children, but can also be null.
// TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes
// of outer join.
- case j @ Join(_, _, Inner, _) =>
+ case j @ Join(_, _, Inner, _) if !stop =>
j.transformExpressions(replaceFoldable)
// We can fold the projections an expand holds. However expand changes the output columns
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
index 82756f545a..d128315b68 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
@@ -130,6 +130,20 @@ class FoldablePropagationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("Propagate in inner join") {
+ val ta = testRelation.select('a, Literal(1).as('tag))
+ .union(testRelation.select('a, Literal(2).as('tag)))
+ .subquery('ta)
+ val tb = testRelation.select('a, Literal(1).as('tag))
+ .union(testRelation.select('a, Literal(2).as('tag)))
+ .subquery('tb)
+ val query = ta.join(tb, Inner,
+ Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag".attr))
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = query.analyze
+ comparePlans(optimized, correctAnswer)
+ }
+
test("Propagate in expand") {
val c1 = Literal(1).as('a)
val c2 = Literal(2).as('b)