aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-03-14 18:52:16 +0100
committerHerman van Hovell <hvanhovell@databricks.com>2017-03-14 18:52:16 +0100
commite04c05cf41a125b0526f59f9b9e7fdf0b78b8b21 (patch)
tree247a5b094cdf4ba72d801eb4ad5f7565ad757191 /sql
parent6325a2f82a95a63bee020122620bc4f5fd25d059 (diff)
downloadspark-e04c05cf41a125b0526f59f9b9e7fdf0b78b8b21.tar.gz
spark-e04c05cf41a125b0526f59f9b9e7fdf0b78b8b21.tar.bz2
spark-e04c05cf41a125b0526f59f9b9e7fdf0b78b8b21.zip
[SPARK-19933][SQL] Do not change output of a subquery
## What changes were proposed in this pull request? The `RemoveRedundantAlias` rule can change the output attributes (the expression id's to be precise) of a query by eliminating the redundant alias producing them. This is no problem for a regular query, but can cause problems for correlated subqueries: The attributes produced by the subquery are used in the parent plan; changing them will break the parent plan. This PR fixes this by wrapping a subquery in a `Subquery` top level node when it gets optimized. The `RemoveRedundantAlias` rule now recognizes `Subquery` and makes sure that the output attributes of the `Subquery` node are retained. ## How was this patch tested? Added a test case to `RemoveRedundantAliasAndProjectSuite` and added a regression test to `SubquerySuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17278 from hvanhovell/SPARK-19933.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala14
4 files changed, 42 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e9dbded3d4..c8ed4190a1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -142,7 +142,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
object OptimizeSubqueries extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case s: SubqueryExpression =>
- s.withNewPlan(Optimizer.this.execute(s.plan))
+ val Subquery(newPlan) = Optimizer.this.execute(Subquery(s.plan))
+ s.withNewPlan(newPlan)
}
}
}
@@ -187,7 +188,10 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
// If the alias name is different from attribute name, we can't strip it either, or we
// may accidentally change the output schema name of the root plan.
case a @ Alias(attr: Attribute, name)
- if a.metadata == Metadata.empty && name == attr.name && !blacklist.contains(attr) =>
+ if a.metadata == Metadata.empty &&
+ name == attr.name &&
+ !blacklist.contains(attr) &&
+ !blacklist.contains(a) =>
attr
case a => a
}
@@ -195,10 +199,15 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
/**
* Remove redundant alias expression from a LogicalPlan and its subtree. A blacklist is used to
* prevent the removal of seemingly redundant aliases used to deduplicate the input for a (self)
- * join.
+ * join or to prevent the removal of top-level subquery attributes.
*/
private def removeRedundantAliases(plan: LogicalPlan, blacklist: AttributeSet): LogicalPlan = {
plan match {
+ // We want to keep the same output attributes for subqueries. This means we cannot remove
+ // the aliases that produce these attributes
+ case Subquery(child) =>
+ Subquery(removeRedundantAliases(child, blacklist ++ child.outputSet))
+
// A join has to be treated differently, because the left and the right side of the join are
// not allowed to use the same attributes. We use a blacklist to prevent us from creating a
// situation in which this happens; the rule will only remove an alias if its child
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 31b6ed48a2..5cbf263d1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -38,6 +38,14 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
+/**
+ * This node is inserted at the top of a subquery when it is optimized. This makes sure we can
+ * recognize a subquery as such, and it allows us to write subquery aware transformations.
+ */
+case class Subquery(child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
+
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def maxRows: Option[Long] = child.maxRows
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
index c01ea01ec6..1973b5abb4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
@@ -116,4 +116,12 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper
val expected = relation.window(Seq('b), Seq('a), Seq()).analyze
comparePlans(optimized, expected)
}
+
+ test("do not remove output attributes from a subquery") {
+ val relation = LocalRelation('a.int, 'b.int)
+ val query = Subquery(relation.select('a as "a", 'b as "b").where('b < 10).select('a).analyze)
+ val optimized = Optimize.execute(query)
+ val expected = Subquery(relation.select('a as "a", 'b).where('b < 10).select('a).analyze)
+ comparePlans(optimized, expected)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 6f1cd49c08..5fe6667cec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -830,4 +830,18 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
Row(1) :: Row(0) :: Nil)
}
}
+
+ test("SPARK-19933 Do not eliminate top-level aliases in sub-queries") {
+ withTempView("t1", "t2") {
+ spark.range(4).createOrReplaceTempView("t1")
+ checkAnswer(
+ sql("select * from t1 where id in (select id as id from t1)"),
+ Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil)
+
+ spark.range(2).createOrReplaceTempView("t2")
+ checkAnswer(
+ sql("select * from t1 where id in (select id as id from t2)"),
+ Row(0) :: Row(1) :: Nil)
+ }
+ }
}