diff options
author | Ioana Delaney <ioanamdelaney@gmail.com> | 2016-06-12 14:26:29 -0700 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-06-12 14:26:29 -0700 |
commit | 0ff8a68b9faefecf90c9ceef49580b2909beb19e (patch) | |
tree | 7e368aa577510ae41a24dedb12ce5a607d83038f /sql/core | |
parent | 9770f6ee60f6834e4e1200234109120427a5cc0d (diff) | |
download | spark-0ff8a68b9faefecf90c9ceef49580b2909beb19e.tar.gz spark-0ff8a68b9faefecf90c9ceef49580b2909beb19e.tar.bz2 spark-0ff8a68b9faefecf90c9ceef49580b2909beb19e.zip |
[SPARK-15832][SQL] Embedded IN/EXISTS predicate subquery throws TreeNodeException
## What changes were proposed in this pull request?
Queries with embedded existential sub-query predicates throws exception when building the physical plan.
Example failing query:
```SQL
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
scala> sql("select c1 from t1 where (case when c2 in (select c2 from t2) then 2 else 3 end) IN (select c2 from t1)").show()
Binding attribute, tree: c2#239
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c2#239
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
...
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.joins.HashJoin$class.org$apache$spark$sql$execution$joins$HashJoin$$x$8(HashJoin.scala:66)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8$lzycompute(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.HashJoin$class.buildKeys(HashJoin.scala:63)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys$lzycompute(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.requiredChildDistribution(BroadcastHashJoinExec.scala:52)
```
**Problem description:**
When the left hand side expression of an existential sub-query predicate contains another embedded sub-query predicate, the RewritePredicateSubquery optimizer rule does not resolve the embedded sub-query expressions into existential joins.For example, the above query has the following optimized plan, which fails during physical plan build.
```SQL
== Optimized Logical Plan ==
Project [_1#224 AS c1#227]
+- Join LeftSemi, (CASE WHEN predicate-subquery#255 [(_2#225 = c2#239)] THEN 2 ELSE 3 END = c2#228#262)
: +- SubqueryAlias predicate-subquery#255 [(_2#225 = c2#239)]
: +- LocalRelation [c2#239]
:- LocalRelation [_1#224, _2#225]
+- LocalRelation [c2#228#262]
== Physical Plan ==
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c2#239
```
**Solution:**
In RewritePredicateSubquery, before rewriting the outermost predicate sub-query, resolve any embedded existential sub-queries. The Optimized plan for the above query after the changes looks like below.
```SQL
== Optimized Logical Plan ==
Project [_1#224 AS c1#227]
+- Join LeftSemi, (CASE WHEN exists#285 THEN 2 ELSE 3 END = c2#228#284)
:- Join ExistenceJoin(exists#285), (_2#225 = c2#239)
: :- LocalRelation [_1#224, _2#225]
: +- LocalRelation [c2#239]
+- LocalRelation [c2#228#284]
== Physical Plan ==
*Project [_1#224 AS c1#227]
+- *BroadcastHashJoin [CASE WHEN exists#285 THEN 2 ELSE 3 END], [c2#228#284], LeftSemi, BuildRight
:- *BroadcastHashJoin [_2#225], [c2#239], ExistenceJoin(exists#285), BuildRight
: :- LocalTableScan [_1#224, _2#225]
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- LocalTableScan [c2#239]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [c2#228#284]
+- LocalTableScan [c222#36], [[111],[222]]
```
## How was this patch tested?
Added new test cases in SubquerySuite.scala
Author: Ioana Delaney <ioanamdelaney@gmail.com>
Closes #13570 from ioana-delaney/fixEmbedSubPredV1.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 06ced99974..1d9ff21dbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -266,6 +266,172 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(null) :: Row(1) :: Row(3) :: Nil) } + test("SPARK-15832: Test embedded existential predicate sub-queries") { + withTempTable("t1", "t2", "t3", "t4", "t5") { + Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1") + Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2") + Seq((1, 1), (2, 2), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t3") + + checkAnswer( + sql( + """ + | select c1 from t1 + | where c2 IN (select c2 from t2) + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where c2 NOT IN (select c2 from t2) + | + """.stripMargin), + Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where EXISTS (select c2 from t2) + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where NOT EXISTS (select c2 from t2) + | + """.stripMargin), + Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where NOT EXISTS (select c2 from t2) and + | c2 IN (select c2 from t3) + | + """.stripMargin), + Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where (case when c2 IN (select 1 as one) then 1 + | else 2 end) = c1 + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where (case when c2 IN (select 1 as one) then 1 + | else 2 end) + | IN (select c2 from t2) + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where (case when c2 IN (select c2 from t2) then 1 + | else 2 end) + | IN (select c2 from t3) + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where (case when c2 IN (select c2 from t2) then 1 + | when c2 IN (select c2 from t3) then 2 + | else 3 end) + | IN (select c2 from t1) + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where (c1, (case when c2 IN (select c2 from t2) then 1 + | when c2 IN (select c2 from t3) then 2 + | else 3 end)) + | IN (select c1, c2 from t1) + | + """.stripMargin), + Row(1) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t3 + | where ((case when c2 IN (select c2 from t2) then 1 else 2 end), + | (case when c2 IN (select c2 from t3) then 2 else 3 end)) + | IN (select c1, c2 from t3) + | + """.stripMargin), + Row(1) :: Row(2) :: Row(1) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where ((case when EXISTS (select c2 from t2) then 1 else 2 end), + | (case when c2 IN (select c2 from t3) then 2 else 3 end)) + | IN (select c1, c2 from t3) + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where (case when c2 IN (select c2 from t2) then 3 + | else 2 end) + | NOT IN (select c2 from t3) + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where ((case when c2 IN (select c2 from t2) then 1 else 2 end), + | (case when NOT EXISTS (select c2 from t3) then 2 + | when EXISTS (select c2 from t2) then 3 + | else 3 end)) + | NOT IN (select c1, c2 from t3) + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + + checkAnswer( + sql( + """ + | select c1 from t1 + | where (select max(c1) from t2 where c2 IN (select c2 from t3)) + | IN (select c2 from t2) + | + """.stripMargin), + Row(1) :: Row(2) :: Nil) + } + } + test("correlated scalar subquery in where") { checkAnswer( sql("select * from l where b < (select max(d) from r where a = c)"), |