aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-02-07 22:28:59 +0100
committerHerman van Hovell <hvanhovell@databricks.com>2017-02-07 22:28:59 +0100
commit73ee73945e369a862480ef4ac64e55c797bd7d90 (patch)
treede712e38633bacba3e3c5f94b9c51bcd4259ba90 /sql
parentb7277e03d1038e2a19495c0ef7707e2d77937ccf (diff)
downloadspark-73ee73945e369a862480ef4ac64e55c797bd7d90.tar.gz
spark-73ee73945e369a862480ef4ac64e55c797bd7d90.tar.bz2
spark-73ee73945e369a862480ef4ac64e55c797bd7d90.zip
[SPARK-18609][SPARK-18841][SQL] Fix redundant Alias removal in the optimizer
## What changes were proposed in this pull request? The optimizer tries to remove redundant alias only projections from the query plan using the `RemoveAliasOnlyProject` rule. The current rule identifies removes such a project and rewrites the project's attributes in the **entire** tree. This causes problems when parts of the tree are duplicated (for instance a self join on a temporary view/CTE) and the duplicated part contains the alias only project, in this case the rewrite will break the tree. This PR fixes these problems by using a blacklist for attributes that are not to be moved, and by making sure that attribute remapping is only done for the parent tree, and not for unrelated parts of the query plan. The current tree transformation infrastructure works very well if the transformation at hand requires little or a global contextual information. In this case we need to know both the attributes that were not to be moved, and we also needed to know which child attributes were modified. This cannot be done easily using the current infrastructure, and solutions typically involves transversing the query plan multiple times (which is super slow). I have moved around some code in `TreeNode`, `QueryPlan` and `LogicalPlan`to make this much more straightforward; this basically allows you to manually traverse the tree. This PR subsumes the following PRs by windpiger: Closes https://github.com/apache/spark/pull/16267 Closes https://github.com/apache/spark/pull/16255 ## How was this patch tested? I have added unit tests to `RemoveRedundantAliasAndProjectSuite` and I have added integration tests to the `SQLQueryTestSuite.union` and `SQLQueryTestSuite.cte` test cases. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #16757 from hvanhovell/SPARK-18609.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala125
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala42
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala46
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAliasOnlyProjectSuite.scala)52
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/cte.sql15
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/union.sql16
-rw-r--r--sql/core/src/test/resources/sql-tests/results/cte.sql.out49
-rw-r--r--sql/core/src/test/resources/sql-tests/results/union.sql.out70
9 files changed, 302 insertions, 115 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d8e9d3043c..0c13e3e93a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -110,7 +110,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
- RemoveAliasOnlyProject,
+ RemoveRedundantAliases,
+ RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
SimplifyCreateMapOps) ::
@@ -157,56 +158,98 @@ class SimpleTestOptimizer extends Optimizer(
new SimpleCatalystConf(caseSensitiveAnalysis = true))
/**
- * Removes the Project only conducting Alias of its child node.
- * It is created mainly for removing extra Project added in EliminateSerialization rule,
- * but can also benefit other operators.
+ * Remove redundant aliases from a query plan. A redundant alias is an alias that does not change
+ * the name or metadata of a column, and does not deduplicate it.
*/
-object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
+object RemoveRedundantAliases extends Rule[LogicalPlan] {
+
/**
- * Returns true if the project list is semantically same as child output, after strip alias on
- * attribute.
+ * Create an attribute mapping from the old to the new attributes. This function will only
+ * return the attribute pairs that have changed.
*/
- private def isAliasOnly(
- projectList: Seq[NamedExpression],
- childOutput: Seq[Attribute]): Boolean = {
- if (projectList.length != childOutput.length) {
- false
- } else {
- stripAliasOnAttribute(projectList).zip(childOutput).forall {
- case (a: Attribute, o) if a semanticEquals o => true
- case _ => false
- }
+ private def createAttributeMapping(current: LogicalPlan, next: LogicalPlan)
+ : Seq[(Attribute, Attribute)] = {
+ current.output.zip(next.output).filterNot {
+ case (a1, a2) => a1.semanticEquals(a2)
}
}
- private def stripAliasOnAttribute(projectList: Seq[NamedExpression]) = {
- projectList.map {
- // Alias with metadata can not be stripped, or the metadata will be lost.
- // If the alias name is different from attribute name, we can't strip it either, or we may
- // accidentally change the output schema name of the root plan.
- case a @ Alias(attr: Attribute, name) if a.metadata == Metadata.empty && name == attr.name =>
- attr
- case other => other
- }
+ /**
+ * Remove the top-level alias from an expression when it is redundant.
+ */
+ private def removeRedundantAlias(e: Expression, blacklist: AttributeSet): Expression = e match {
+ // Alias with metadata can not be stripped, or the metadata will be lost.
+ // If the alias name is different from attribute name, we can't strip it either, or we
+ // may accidentally change the output schema name of the root plan.
+ case a @ Alias(attr: Attribute, name)
+ if a.metadata == Metadata.empty && name == attr.name && !blacklist.contains(attr) =>
+ attr
+ case a => a
}
- def apply(plan: LogicalPlan): LogicalPlan = {
- val aliasOnlyProject = plan.collectFirst {
- case p @ Project(pList, child) if isAliasOnly(pList, child.output) => p
- }
+ /**
+ * Remove redundant alias expression from a LogicalPlan and its subtree. A blacklist is used to
+ * prevent the removal of seemingly redundant aliases used to deduplicate the input for a (self)
+ * join.
+ */
+ private def removeRedundantAliases(plan: LogicalPlan, blacklist: AttributeSet): LogicalPlan = {
+ plan match {
+ // A join has to be treated differently, because the left and the right side of the join are
+ // not allowed to use the same attributes. We use a blacklist to prevent us from creating a
+ // situation in which this happens; the rule will only remove an alias if its child
+ // attribute is not on the black list.
+ case Join(left, right, joinType, condition) =>
+ val newLeft = removeRedundantAliases(left, blacklist ++ right.outputSet)
+ val newRight = removeRedundantAliases(right, blacklist ++ newLeft.outputSet)
+ val mapping = AttributeMap(
+ createAttributeMapping(left, newLeft) ++
+ createAttributeMapping(right, newRight))
+ val newCondition = condition.map(_.transform {
+ case a: Attribute => mapping.getOrElse(a, a)
+ })
+ Join(newLeft, newRight, joinType, newCondition)
+
+ case _ =>
+ // Remove redundant aliases in the subtree(s).
+ val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)]
+ val newNode = plan.mapChildren { child =>
+ val newChild = removeRedundantAliases(child, blacklist)
+ currentNextAttrPairs ++= createAttributeMapping(child, newChild)
+ newChild
+ }
- aliasOnlyProject.map { case proj =>
- val attributesToReplace = proj.output.zip(proj.child.output).filterNot {
- case (a1, a2) => a1 semanticEquals a2
- }
- val attrMap = AttributeMap(attributesToReplace)
- plan transform {
- case plan: Project if plan eq proj => plan.child
- case plan => plan transformExpressions {
- case a: Attribute if attrMap.contains(a) => attrMap(a)
+ // Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate
+ // keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this
+ // case we use the the first mapping (which should be provided by the first child).
+ val mapping = AttributeMap(currentNextAttrPairs)
+
+ // Create a an expression cleaning function for nodes that can actually produce redundant
+ // aliases, use identity otherwise.
+ val clean: Expression => Expression = plan match {
+ case _: Project => removeRedundantAlias(_, blacklist)
+ case _: Aggregate => removeRedundantAlias(_, blacklist)
+ case _: Window => removeRedundantAlias(_, blacklist)
+ case _ => identity[Expression]
}
- }
- }.getOrElse(plan)
+
+ // Transform the expressions.
+ newNode.mapExpressions { expr =>
+ clean(expr.transform {
+ case a: Attribute => mapping.getOrElse(a, a)
+ })
+ }
+ }
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = removeRedundantAliases(plan, AttributeSet.empty)
+}
+
+/**
+ * Remove projections from the query plan that do not make any modifications.
+ */
+object RemoveRedundantProject extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case p @ Project(_, child) if p.output == child.output => child
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index b108017c4c..a5761703fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -242,31 +242,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* @param rule the rule to be applied to every expression in this operator.
*/
def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type = {
- var changed = false
-
- @inline def transformExpressionDown(e: Expression): Expression = {
- val newE = e.transformDown(rule)
- if (newE.fastEquals(e)) {
- e
- } else {
- changed = true
- newE
- }
- }
-
- def recursiveTransform(arg: Any): AnyRef = arg match {
- case e: Expression => transformExpressionDown(e)
- case Some(e: Expression) => Some(transformExpressionDown(e))
- case m: Map[_, _] => m
- case d: DataType => d // Avoid unpacking Structs
- case seq: Traversable[_] => seq.map(recursiveTransform)
- case other: AnyRef => other
- case null => null
- }
-
- val newArgs = mapProductIterator(recursiveTransform)
-
- if (changed) makeCopy(newArgs).asInstanceOf[this.type] else this
+ mapExpressions(_.transformDown(rule))
}
/**
@@ -276,10 +252,18 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* @return
*/
def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): this.type = {
+ mapExpressions(_.transformUp(rule))
+ }
+
+ /**
+ * Apply a map function to each expression present in this query operator, and return a new
+ * query operator based on the mapped expressions.
+ */
+ def mapExpressions(f: Expression => Expression): this.type = {
var changed = false
- @inline def transformExpressionUp(e: Expression): Expression = {
- val newE = e.transformUp(rule)
+ @inline def transformExpression(e: Expression): Expression = {
+ val newE = f(e)
if (newE.fastEquals(e)) {
e
} else {
@@ -289,8 +273,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
}
def recursiveTransform(arg: Any): AnyRef = arg match {
- case e: Expression => transformExpressionUp(e)
- case Some(e: Expression) => Some(transformExpressionUp(e))
+ case e: Expression => transformExpression(e)
+ case Some(e: Expression) => Some(transformExpression(e))
case m: Map[_, _] => m
case d: DataType => d // Avoid unpacking Structs
case seq: Traversable[_] => seq.map(recursiveTransform)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 93550e1fc3..0937825e27 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -56,7 +56,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
if (!analyzed) {
- val afterRuleOnChildren = transformChildren(rule, (t, r) => t.resolveOperators(r))
+ val afterRuleOnChildren = mapChildren(_.resolveOperators(rule))
if (this fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(this, identity[LogicalPlan])
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 8fec9dd9b4..f37661c315 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -191,26 +191,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
/**
- * Returns a copy of this node where `f` has been applied to all the nodes children.
- */
- def mapChildren(f: BaseType => BaseType): BaseType = {
- var changed = false
- val newArgs = mapProductIterator {
- case arg: TreeNode[_] if containsChild(arg) =>
- val newChild = f(arg.asInstanceOf[BaseType])
- if (newChild fastEquals arg) {
- arg
- } else {
- changed = true
- newChild
- }
- case nonChild: AnyRef => nonChild
- case null => null
- }
- if (changed) makeCopy(newArgs) else this
- }
-
- /**
* Returns a copy of this node with the children replaced.
* TODO: Validate somewhere (in debug mode?) that children are ordered correctly.
*/
@@ -289,9 +269,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
// Check if unchanged and then possibly return old copy to avoid gc churn.
if (this fastEquals afterRule) {
- transformChildren(rule, (t, r) => t.transformDown(r))
+ mapChildren(_.transformDown(rule))
} else {
- afterRule.transformChildren(rule, (t, r) => t.transformDown(r))
+ afterRule.mapChildren(_.transformDown(rule))
}
}
@@ -303,7 +283,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
* @param rule the function use to transform this nodes children
*/
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
- val afterRuleOnChildren = transformChildren(rule, (t, r) => t.transformUp(r))
+ val afterRuleOnChildren = mapChildren(_.transformUp(rule))
if (this fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(this, identity[BaseType])
@@ -316,18 +296,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
/**
- * Returns a copy of this node where `rule` has been recursively applied to all the children of
- * this node. When `rule` does not apply to a given node it is left unchanged.
- * @param rule the function used to transform this nodes children
+ * Returns a copy of this node where `f` has been applied to all the nodes children.
*/
- protected def transformChildren(
- rule: PartialFunction[BaseType, BaseType],
- nextOperation: (BaseType, PartialFunction[BaseType, BaseType]) => BaseType): BaseType = {
+ def mapChildren(f: BaseType => BaseType): BaseType = {
if (children.nonEmpty) {
var changed = false
val newArgs = mapProductIterator {
case arg: TreeNode[_] if containsChild(arg) =>
- val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
+ val newChild = f(arg.asInstanceOf[BaseType])
if (!(newChild fastEquals arg)) {
changed = true
newChild
@@ -335,7 +311,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
arg
}
case Some(arg: TreeNode[_]) if containsChild(arg) =>
- val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
+ val newChild = f(arg.asInstanceOf[BaseType])
if (!(newChild fastEquals arg)) {
changed = true
Some(newChild)
@@ -344,7 +320,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
case m: Map[_, _] => m.mapValues {
case arg: TreeNode[_] if containsChild(arg) =>
- val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
+ val newChild = f(arg.asInstanceOf[BaseType])
if (!(newChild fastEquals arg)) {
changed = true
newChild
@@ -356,7 +332,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case d: DataType => d // Avoid unpacking Structs
case args: Traversable[_] => args.map {
case arg: TreeNode[_] if containsChild(arg) =>
- val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
+ val newChild = f(arg.asInstanceOf[BaseType])
if (!(newChild fastEquals arg)) {
changed = true
newChild
@@ -364,8 +340,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
arg
}
case tuple@(arg1: TreeNode[_], arg2: TreeNode[_]) =>
- val newChild1 = nextOperation(arg1.asInstanceOf[BaseType], rule)
- val newChild2 = nextOperation(arg2.asInstanceOf[BaseType], rule)
+ val newChild1 = f(arg1.asInstanceOf[BaseType])
+ val newChild2 = f(arg2.asInstanceOf[BaseType])
if (!(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
changed = true
(newChild1, newChild2)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAliasOnlyProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
index 7c26cb5598..c01ea01ec6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAliasOnlyProjectSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
@@ -25,10 +25,15 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.MetadataBuilder
-class RemoveAliasOnlyProjectSuite extends PlanTest with PredicateHelper {
+class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper {
object Optimize extends RuleExecutor[LogicalPlan] {
- val batches = Batch("RemoveAliasOnlyProject", FixedPoint(50), RemoveAliasOnlyProject) :: Nil
+ val batches = Batch(
+ "RemoveAliasOnlyProject",
+ FixedPoint(50),
+ PushProjectionThroughUnion,
+ RemoveRedundantAliases,
+ RemoveRedundantProject) :: Nil
}
test("all expressions in project list are aliased child output") {
@@ -42,7 +47,8 @@ class RemoveAliasOnlyProjectSuite extends PlanTest with PredicateHelper {
val relation = LocalRelation('a.int, 'b.int)
val query = relation.select('b as 'b, 'a as 'a).analyze
val optimized = Optimize.execute(query)
- comparePlans(optimized, query)
+ val expected = relation.select('b, 'a).analyze
+ comparePlans(optimized, expected)
}
test("some expressions in project list are aliased child output") {
@@ -56,14 +62,16 @@ class RemoveAliasOnlyProjectSuite extends PlanTest with PredicateHelper {
val relation = LocalRelation('a.int, 'b.int)
val query = relation.select('b as 'b, 'a).analyze
val optimized = Optimize.execute(query)
- comparePlans(optimized, query)
+ val expected = relation.select('b, 'a).analyze
+ comparePlans(optimized, expected)
}
test("some expressions in project list are not Alias or Attribute") {
val relation = LocalRelation('a.int, 'b.int)
val query = relation.select('a as 'a, 'b + 1).analyze
val optimized = Optimize.execute(query)
- comparePlans(optimized, query)
+ val expected = relation.select('a, 'b + 1).analyze
+ comparePlans(optimized, expected)
}
test("some expressions in project list are aliased child output but with metadata") {
@@ -74,4 +82,38 @@ class RemoveAliasOnlyProjectSuite extends PlanTest with PredicateHelper {
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
}
+
+ test("retain deduplicating alias in self-join") {
+ val relation = LocalRelation('a.int)
+ val fragment = relation.select('a as 'a)
+ val query = fragment.select('a as 'a).join(fragment.select('a as 'a)).analyze
+ val optimized = Optimize.execute(query)
+ val expected = relation.join(relation.select('a as 'a)).analyze
+ comparePlans(optimized, expected)
+ }
+
+ test("alias removal should not break after push project through union") {
+ val r1 = LocalRelation('a.int)
+ val r2 = LocalRelation('b.int)
+ val query = r1.select('a as 'a).union(r2.select('b as 'b)).select('a).analyze
+ val optimized = Optimize.execute(query)
+ val expected = r1.union(r2)
+ comparePlans(optimized, expected)
+ }
+
+ test("remove redundant alias from aggregate") {
+ val relation = LocalRelation('a.int, 'b.int)
+ val query = relation.groupBy('a as 'a)('a as 'a, sum('b)).analyze
+ val optimized = Optimize.execute(query)
+ val expected = relation.groupBy('a)('a, sum('b)).analyze
+ comparePlans(optimized, expected)
+ }
+
+ test("remove redundant alias from window") {
+ val relation = LocalRelation('a.int, 'b.int)
+ val query = relation.window(Seq('b as 'b), Seq('a as 'a), Seq()).analyze
+ val optimized = Optimize.execute(query)
+ val expected = relation.window(Seq('b), Seq('a), Seq()).analyze
+ comparePlans(optimized, expected)
+ }
}
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte.sql b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
index 3914db2691..d34d89f235 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/cte.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
@@ -12,3 +12,18 @@ WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2;
-- WITH clause should reference the previous CTE
WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1 cross join t2;
+
+-- SPARK-18609 CTE with self-join
+WITH CTE1 AS (
+ SELECT b.id AS id
+ FROM T2 a
+ CROSS JOIN (SELECT id AS id FROM T2) b
+)
+SELECT t1.id AS c1,
+ t2.id AS c2
+FROM CTE1 t1
+ CROSS JOIN CTE1 t2;
+
+-- Clean up
+DROP VIEW IF EXISTS t;
+DROP VIEW IF EXISTS t2;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/union.sql b/sql/core/src/test/resources/sql-tests/inputs/union.sql
index 1f4780abde..e57d69eaad 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/union.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/union.sql
@@ -22,6 +22,22 @@ FROM (SELECT 0 a, 0 b
SELECT SUM(1) a, CAST(0 AS BIGINT) b
UNION ALL SELECT 0 a, 0 b) T;
+-- Regression test for SPARK-18841 Push project through union should not be broken by redundant alias removal.
+CREATE OR REPLACE TEMPORARY VIEW p1 AS VALUES 1 T(col);
+CREATE OR REPLACE TEMPORARY VIEW p2 AS VALUES 1 T(col);
+CREATE OR REPLACE TEMPORARY VIEW p3 AS VALUES 1 T(col);
+SELECT 1 AS x,
+ col
+FROM (SELECT col AS col
+ FROM (SELECT p1.col AS col
+ FROM p1 CROSS JOIN p2
+ UNION ALL
+ SELECT col
+ FROM p3) T1) T2;
+
-- Clean-up
DROP VIEW IF EXISTS t1;
DROP VIEW IF EXISTS t2;
+DROP VIEW IF EXISTS p1;
+DROP VIEW IF EXISTS p2;
+DROP VIEW IF EXISTS p3;
diff --git a/sql/core/src/test/resources/sql-tests/results/cte.sql.out b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
index 9fbad8f380..a446c2cd18 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 6
+-- Number of queries: 9
-- !query 0
@@ -55,3 +55,50 @@ struct<id:int,2:int>
0 2
1 2
1 2
+
+
+-- !query 6
+WITH CTE1 AS (
+ SELECT b.id AS id
+ FROM T2 a
+ CROSS JOIN (SELECT id AS id FROM T2) b
+)
+SELECT t1.id AS c1,
+ t2.id AS c2
+FROM CTE1 t1
+ CROSS JOIN CTE1 t2
+-- !query 6 schema
+struct<c1:int,c2:int>
+-- !query 6 output
+0 0
+0 0
+0 0
+0 0
+0 1
+0 1
+0 1
+0 1
+1 0
+1 0
+1 0
+1 0
+1 1
+1 1
+1 1
+1 1
+
+
+-- !query 7
+DROP VIEW IF EXISTS t
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+DROP VIEW IF EXISTS t2
+-- !query 8 schema
+struct<>
+-- !query 8 output
+
diff --git a/sql/core/src/test/resources/sql-tests/results/union.sql.out b/sql/core/src/test/resources/sql-tests/results/union.sql.out
index c57028cabe..d123b7fdbe 100644
--- a/sql/core/src/test/resources/sql-tests/results/union.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/union.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 7
+-- Number of queries: 14
-- !query 0
@@ -65,7 +65,7 @@ struct<a:bigint>
-- !query 5
-DROP VIEW IF EXISTS t1
+CREATE OR REPLACE TEMPORARY VIEW p1 AS VALUES 1 T(col)
-- !query 5 schema
struct<>
-- !query 5 output
@@ -73,8 +73,72 @@ struct<>
-- !query 6
-DROP VIEW IF EXISTS t2
+CREATE OR REPLACE TEMPORARY VIEW p2 AS VALUES 1 T(col)
-- !query 6 schema
struct<>
-- !query 6 output
+
+
+-- !query 7
+CREATE OR REPLACE TEMPORARY VIEW p3 AS VALUES 1 T(col)
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+SELECT 1 AS x,
+ col
+FROM (SELECT col AS col
+ FROM (SELECT p1.col AS col
+ FROM p1 CROSS JOIN p2
+ UNION ALL
+ SELECT col
+ FROM p3) T1) T2
+-- !query 8 schema
+struct<x:int,col:int>
+-- !query 8 output
+1 1
+1 1
+
+
+-- !query 9
+DROP VIEW IF EXISTS t1
+-- !query 9 schema
+struct<>
+-- !query 9 output
+
+
+
+-- !query 10
+DROP VIEW IF EXISTS t2
+-- !query 10 schema
+struct<>
+-- !query 10 output
+
+
+
+-- !query 11
+DROP VIEW IF EXISTS p1
+-- !query 11 schema
+struct<>
+-- !query 11 output
+
+
+
+-- !query 12
+DROP VIEW IF EXISTS p2
+-- !query 12 schema
+struct<>
+-- !query 12 output
+
+
+
+-- !query 13
+DROP VIEW IF EXISTS p3
+-- !query 13 schema
+struct<>
+-- !query 13 output
+