aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/test
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@questtec.nl>2016-04-29 16:47:56 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-29 16:50:12 -0700
commit83061be697f69f7e39deb9cda45742a323714231 (patch)
treeb20fad5198d43bbdb83ecf0ea80cd0834f45bcb7 /sql/catalyst/src/test
parent1eda2f10d9f7add319e5b271488045c44ea30c03 (diff)
downloadspark-83061be697f69f7e39deb9cda45742a323714231.tar.gz
spark-83061be697f69f7e39deb9cda45742a323714231.tar.bz2
spark-83061be697f69f7e39deb9cda45742a323714231.zip
[SPARK-14858] [SQL] Enable subquery pushdown
The previous subquery PRs did not include support for pushing subqueries used in filters (`WHERE`/`HAVING`) down. This PR adds this support. For example : ```scala range(0, 10).registerTempTable("a") range(5, 15).registerTempTable("b") range(7, 25).registerTempTable("c") range(3, 12).registerTempTable("d") val plan = sql("select * from a join b on a.id = b.id left join c on c.id = b.id where a.id in (select id from d)") plan.explain(true) ``` Leads to the following Analyzed & Optimized plans: ``` == Parsed Logical Plan == ... == Analyzed Logical Plan == id: bigint, id: bigint, id: bigint Project [id#0L,id#4L,id#8L] +- Filter predicate-subquery#16 [(id#0L = id#12L)] : +- SubqueryAlias predicate-subquery#16 [(id#0L = id#12L)] : +- Project [id#12L] : +- SubqueryAlias d : +- Range 3, 12, 1, 8, [id#12L] +- Join LeftOuter, Some((id#8L = id#4L)) :- Join Inner, Some((id#0L = id#4L)) : :- SubqueryAlias a : : +- Range 0, 10, 1, 8, [id#0L] : +- SubqueryAlias b : +- Range 5, 15, 1, 8, [id#4L] +- SubqueryAlias c +- Range 7, 25, 1, 8, [id#8L] == Optimized Logical Plan == Join LeftOuter, Some((id#8L = id#4L)) :- Join Inner, Some((id#0L = id#4L)) : :- Join LeftSemi, Some((id#0L = id#12L)) : : :- Range 0, 10, 1, 8, [id#0L] : : +- Range 3, 12, 1, 8, [id#12L] : +- Range 5, 15, 1, 8, [id#4L] +- Range 7, 25, 1, 8, [id#8L] == Physical Plan == ... ``` I have also taken the opportunity to move quite a bit of code around: - Rewriting subqueris and pulling out correlated predicated from subqueries has been moved into the analyzer. The analyzer transforms `Exists` and `InSubQuery` into `PredicateSubquery` expressions. A PredicateSubquery exposes the 'join' expressions and the proper references. This makes things like type coercion, optimization and planning easier to do. - I have added support for `Aggregate` plans in subqueries. Any correlated expressions will be added to the grouping expressions. I have removed support for `Union` plans, since pulling in an outer reference from beneath a Union has no value (a filtered value could easily be part of another Union child). - Resolution of subqueries is now done using `OuterReference`s. These are used to wrap any outer reference; this makes the identification of these references easier, and also makes dealing with duplicate attributes in the outer and inner plans easier. The resolution of subqueries initially used a resolution loop which would alternate between calling the analyzer and trying to resolve the outer references. We now use a dedicated analyzer which uses a special rule for outer reference resolution. These changes are a stepping stone for enabling correlated scalar subqueries, enabling all Hive tests & allowing us to use predicate subqueries anywhere. Current tests and added test cases in FilterPushdownSuite. Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #12720 from hvanhovell/SPARK-14858.
Diffstat (limited to 'sql/catalyst/src/test')
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala24
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala39
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala10
4 files changed, 59 insertions, 16 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index a90636d278..1b08913ddd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
@@ -449,7 +450,7 @@ class AnalysisErrorSuite extends AnalysisTest {
val a = AttributeReference("a", IntegerType)()
val b = AttributeReference("b", IntegerType)()
val plan = Project(
- Seq(a, Alias(InSubQuery(a, LocalRelation(b)), "c")()),
+ Seq(a, Alias(In(a, Seq(ListQuery(LocalRelation(b)))), "c")()),
LocalRelation(a))
assertAnalysisError(plan, "Predicate sub-queries can only be used in a Filter" :: Nil)
}
@@ -458,10 +459,10 @@ class AnalysisErrorSuite extends AnalysisTest {
val a = AttributeReference("a", IntegerType)()
val b = AttributeReference("b", IntegerType)()
val c = AttributeReference("c", BooleanType)()
- val plan1 = Filter(Cast(InSubQuery(a, LocalRelation(b)), BooleanType), LocalRelation(a))
+ val plan1 = Filter(Cast(In(a, Seq(ListQuery(LocalRelation(b)))), BooleanType), LocalRelation(a))
assertAnalysisError(plan1, "Predicate sub-queries cannot be used in nested conditions" :: Nil)
- val plan2 = Filter(Or(InSubQuery(a, LocalRelation(b)), c), LocalRelation(a, c))
+ val plan2 = Filter(Or(In(a, Seq(ListQuery(LocalRelation(b)))), c), LocalRelation(a, c))
assertAnalysisError(plan2, "Predicate sub-queries cannot be used in nested conditions" :: Nil)
}
@@ -474,7 +475,7 @@ class AnalysisErrorSuite extends AnalysisTest {
Exists(
Join(
LocalRelation(b),
- Filter(EqualTo(a, c), LocalRelation(c)),
+ Filter(EqualTo(OuterReference(a), c), LocalRelation(c)),
LeftOuter,
Option(EqualTo(b, c)))),
LocalRelation(a))
@@ -483,7 +484,7 @@ class AnalysisErrorSuite extends AnalysisTest {
val plan2 = Filter(
Exists(
Join(
- Filter(EqualTo(a, c), LocalRelation(c)),
+ Filter(EqualTo(OuterReference(a), c), LocalRelation(c)),
LocalRelation(b),
RightOuter,
Option(EqualTo(b, c)))),
@@ -491,13 +492,16 @@ class AnalysisErrorSuite extends AnalysisTest {
assertAnalysisError(plan2, "Accessing outer query column is not allowed in" :: Nil)
val plan3 = Filter(
- Exists(Aggregate(Seq.empty, Seq.empty, Filter(EqualTo(a, c), LocalRelation(c)))),
+ Exists(Union(LocalRelation(b), Filter(EqualTo(OuterReference(a), c), LocalRelation(c)))),
LocalRelation(a))
assertAnalysisError(plan3, "Accessing outer query column is not allowed in" :: Nil)
+ }
- val plan4 = Filter(
- Exists(Union(LocalRelation(b), Filter(EqualTo(a, c), LocalRelation(c)))),
- LocalRelation(a))
- assertAnalysisError(plan4, "Accessing outer query column is not allowed in" :: Nil)
+ test("Correlated Scalar Subquery") {
+ val a = AttributeReference("a", IntegerType)()
+ val b = AttributeReference("b", IntegerType)()
+ val sub = Project(Seq(b), Filter(EqualTo(UnresolvedAttribute("a"), b), LocalRelation(b)))
+ val plan = Project(Seq(a, Alias(ScalarSubquery(sub), "b")()), LocalRelation(a))
+ assertAnalysisError(plan, "Correlated scalar subqueries are not supported." :: Nil)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index e9b4bb002b..fcc14a803b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, PlanTest, RightOuter}
+import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.IntegerType
@@ -725,6 +725,43 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctedAnswer)
}
+ test("predicate subquery: push down simple") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val z = LocalRelation('a.int, 'b.int, 'c.int).subquery('z)
+
+ val query = x
+ .join(y, Inner, Option("x.a".attr === "y.a".attr))
+ .where(Exists(z.where("x.a".attr === "z.a".attr)))
+ .analyze
+ val answer = x
+ .where(Exists(z.where("x.a".attr === "z.a".attr)))
+ .join(y, Inner, Option("x.a".attr === "y.a".attr))
+ .analyze
+ val optimized = Optimize.execute(Optimize.execute(query))
+ comparePlans(optimized, answer)
+ }
+
+ test("predicate subquery: push down complex") {
+ val w = testRelation.subquery('w)
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val z = LocalRelation('a.int, 'b.int, 'c.int).subquery('z)
+
+ val query = w
+ .join(x, Inner, Option("w.a".attr === "x.a".attr))
+ .join(y, LeftOuter, Option("x.a".attr === "y.a".attr))
+ .where(Exists(z.where("w.a".attr === "z.a".attr)))
+ .analyze
+ val answer = w
+ .where(Exists(z.where("w.a".attr === "z.a".attr)))
+ .join(x, Inner, Option("w.a".attr === "x.a".attr))
+ .join(y, LeftOuter, Option("x.a".attr === "y.a".attr))
+ .analyze
+ val optimized = Optimize.execute(Optimize.execute(query))
+ comparePlans(optimized, answer)
+ }
+
test("Window: predicate push down -- basic") {
val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
index 5af3ea9c7a..e73592c7af 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
@@ -146,7 +146,7 @@ class ExpressionParserSuite extends PlanTest {
test("in sub-query") {
assertEqual(
"a in (select b from c)",
- InSubQuery('a, table("c").select('b)))
+ In('a, Seq(ListQuery(table("c").select('b)))))
}
test("like expressions") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index f5439d70ad..6310f0c2bc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -34,11 +34,13 @@ abstract class PlanTest extends SparkFunSuite with PredicateHelper {
protected def normalizeExprIds(plan: LogicalPlan) = {
plan transformAllExpressions {
case s: ScalarSubquery =>
- ScalarSubquery(s.query, ExprId(0))
- case s: InSubQuery =>
- InSubQuery(s.value, s.query, ExprId(0))
+ s.copy(exprId = ExprId(0))
case e: Exists =>
- Exists(e.query, ExprId(0))
+ e.copy(exprId = ExprId(0))
+ case l: ListQuery =>
+ l.copy(exprId = ExprId(0))
+ case p: PredicateSubquery =>
+ p.copy(exprId = ExprId(0))
case a: AttributeReference =>
AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
case a: Alias =>