diff options
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 98 |
1 files changed, 60 insertions, 38 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7e46ad851c..bb7913e186 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -55,6 +55,10 @@ class Analyzer( val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil lazy val batches: Seq[Batch] = Seq( + Batch("Substitution", fixedPoint, + CTESubstitution :: + WindowsSubstitution :: + Nil : _*), Batch("Resolution", fixedPoint, ResolveRelations :: ResolveReferences :: @@ -72,6 +76,55 @@ class Analyzer( ) /** + * Substitute child plan with cte definitions + */ + object CTESubstitution extends Rule[LogicalPlan] { + // TODO allow subquery to define CTE + def apply(plan: LogicalPlan): LogicalPlan = plan match { + case With(child, relations) => substituteCTE(child, relations) + case other => other + } + + def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = { + plan transform { + // In hive, if there is same table name in database and CTE definition, + // hive will use the table in database, not the CTE one. + // Taking into account the reasonableness and the implementation complexity, + // here use the CTE definition first, check table name only and ignore database name + // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info + case u : UnresolvedRelation => + val substituted = cteRelations.get(u.tableIdentifier.last).map { relation => + val withAlias = u.alias.map(Subquery(_, relation)) + withAlias.getOrElse(relation) + } + substituted.getOrElse(u) + } + } + } + + /** + * Substitute child plan with WindowSpecDefinitions. + */ + object WindowsSubstitution extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Lookup WindowSpecDefinitions. This rule works with unresolved children. + case WithWindowDefinition(windowDefinitions, child) => + child.transform { + case plan => plan.transformExpressions { + case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => + val errorMessage = + s"Window specification $windowName is not defined in the WINDOW clause." + val windowSpecDefinition = + windowDefinitions + .get(windowName) + .getOrElse(failAnalysis(errorMessage)) + WindowExpression(c, windowSpecDefinition) + } + } + } + } + + /** * Removes no-op Alias expressions from the plan. */ object TrimGroupingAliases extends Rule[LogicalPlan] { @@ -172,36 +225,20 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { - def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = { + def getTable(u: UnresolvedRelation): LogicalPlan = { try { - // In hive, if there is same table name in database and CTE definition, - // hive will use the table in database, not the CTE one. - // Taking into account the reasonableness and the implementation complexity, - // here use the CTE definition first, check table name only and ignore database name - cteRelations.get(u.tableIdentifier.last) - .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation)) - .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias)) + catalog.lookupRelation(u.tableIdentifier, u.alias) } catch { case _: NoSuchTableException => u.failAnalysis(s"no such table ${u.tableName}") } } - def apply(plan: LogicalPlan): LogicalPlan = { - val (realPlan, cteRelations) = plan match { - // TODO allow subquery to define CTE - // Add cte table to a temp relation map,drop `with` plan and keep its child - case With(child, relations) => (child, relations) - case other => (other, Map.empty[String, LogicalPlan]) - } - - realPlan transform { - case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => - i.copy( - table = EliminateSubQueries(getTable(u, cteRelations))) - case u: UnresolvedRelation => - getTable(u, cteRelations) - } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => + i.copy(table = EliminateSubQueries(getTable(u))) + case u: UnresolvedRelation => + getTable(u) } } @@ -664,21 +701,6 @@ class Analyzer( // We have to use transformDown at here to make sure the rule of // "Aggregate with Having clause" will be triggered. def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - // Lookup WindowSpecDefinitions. This rule works with unresolved children. - case WithWindowDefinition(windowDefinitions, child) => - child.transform { - case plan => plan.transformExpressions { - case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => - val errorMessage = - s"Window specification $windowName is not defined in the WINDOW clause." - val windowSpecDefinition = - windowDefinitions - .get(windowName) - .getOrElse(failAnalysis(errorMessage)) - WindowExpression(c, windowSpecDefinition) - } - } - // Aggregate with Having clause. This rule works with an unresolved Aggregate because // a resolved Aggregate will not have Window Functions. case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child)) |