From 2a105134e9a3efd46b761fab5e563ddebb26575d Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 12 Aug 2016 19:07:34 +0200 Subject: [SPARK-16771][SQL] WITH clause should not fall into infinite loop. ## What changes were proposed in this pull request? This PR changes the CTE resolving rule to use only **forward-declared** tables in order to prevent infinite loops. More specifically, new logic is like the following. * Resolve CTEs in `WITH` clauses first before replacing the main SQL body. * When resolving CTEs, only forward-declared CTEs or base tables are referenced. - Self-referencing is not allowed any more. - Cross-referencing is not allowed any more. **Reported Error Scenarios** ```scala scala> sql("WITH t AS (SELECT 1 FROM t) SELECT * FROM t") java.lang.StackOverflowError ... scala> sql("WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2") java.lang.StackOverflowError ... ``` Note that `t`, `t1`, and `t2` are not declared in database. Spark falls into infinite loops before resolving table names. ## How was this patch tested? Pass the Jenkins tests with new two testcases. Author: Dongjoon Hyun Closes #14397 from dongjoon-hyun/SPARK-16771-TREENODE. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 24 +++++++++++----------- .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 7 +++---- 3 files changed, 16 insertions(+), 17 deletions(-) (limited to 'sql/catalyst/src/main') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 14a2a323c8..a2e276e8a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -125,22 +125,22 @@ class Analyzer( object CTESubstitution extends Rule[LogicalPlan] { // TODO allow subquery to define CTE def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case With(child, relations) => substituteCTE(child, relations) + case With(child, relations) => + substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) { + case (resolved, (name, relation)) => + resolved :+ name -> ResolveRelations(substituteCTE(relation, resolved)) + }) case other => other } - def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = { - plan transform { - // In hive, if there is same table name in database and CTE definition, - // hive will use the table in database, not the CTE one. - // Taking into account the reasonableness and the implementation complexity, - // here use the CTE definition first, check table name only and ignore database name - // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info + def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = { + plan transformDown { case u : UnresolvedRelation => - val substituted = cteRelations.get(u.tableIdentifier.table).map { relation => - val withAlias = u.alias.map(SubqueryAlias(_, relation)) - withAlias.getOrElse(relation) - } + val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table)) + .map(_._2).map { relation => + val withAlias = u.alias.map(SubqueryAlias(_, relation)) + withAlias.getOrElse(relation) + } substituted.getOrElse(u) case other => // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c7fdc287d1..25c8445b4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -97,7 +97,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { } // Check for duplicate names. checkDuplicateKeys(ctes, ctx) - With(query, ctes.toMap) + With(query, ctes) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index eb612c4c12..2917d8d2a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -392,11 +392,10 @@ case class InsertIntoTable( * This operator will be removed during analysis and the relations will be substituted into child. * * @param child The final query of this CTE. - * @param cteRelations Queries that this CTE defined, - * key is the alias of the CTE definition, - * value is the CTE definition. + * @param cteRelations A sequence of pair (alias, the CTE definition) that this CTE defined + * Each CTE can see the base tables and the previously defined CTEs only. */ -case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode { +case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode { override def output: Seq[Attribute] = child.output } -- cgit v1.2.3