aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala98
1 files changed, 60 insertions, 38 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7e46ad851c..bb7913e186 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -55,6 +55,10 @@ class Analyzer(
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil
lazy val batches: Seq[Batch] = Seq(
+ Batch("Substitution", fixedPoint,
+ CTESubstitution ::
+ WindowsSubstitution ::
+ Nil : _*),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
@@ -72,6 +76,55 @@ class Analyzer(
)
/**
+ * Substitute child plan with cte definitions
+ */
+ object CTESubstitution extends Rule[LogicalPlan] {
+ // TODO allow subquery to define CTE
+ def apply(plan: LogicalPlan): LogicalPlan = plan match {
+ case With(child, relations) => substituteCTE(child, relations)
+ case other => other
+ }
+
+ def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
+ plan transform {
+ // In hive, if there is same table name in database and CTE definition,
+ // hive will use the table in database, not the CTE one.
+ // Taking into account the reasonableness and the implementation complexity,
+ // here use the CTE definition first, check table name only and ignore database name
+ // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
+ case u : UnresolvedRelation =>
+ val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
+ val withAlias = u.alias.map(Subquery(_, relation))
+ withAlias.getOrElse(relation)
+ }
+ substituted.getOrElse(u)
+ }
+ }
+ }
+
+ /**
+ * Substitute child plan with WindowSpecDefinitions.
+ */
+ object WindowsSubstitution extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ // Lookup WindowSpecDefinitions. This rule works with unresolved children.
+ case WithWindowDefinition(windowDefinitions, child) =>
+ child.transform {
+ case plan => plan.transformExpressions {
+ case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
+ val errorMessage =
+ s"Window specification $windowName is not defined in the WINDOW clause."
+ val windowSpecDefinition =
+ windowDefinitions
+ .get(windowName)
+ .getOrElse(failAnalysis(errorMessage))
+ WindowExpression(c, windowSpecDefinition)
+ }
+ }
+ }
+ }
+
+ /**
* Removes no-op Alias expressions from the plan.
*/
object TrimGroupingAliases extends Rule[LogicalPlan] {
@@ -172,36 +225,20 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
- def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
+ def getTable(u: UnresolvedRelation): LogicalPlan = {
try {
- // In hive, if there is same table name in database and CTE definition,
- // hive will use the table in database, not the CTE one.
- // Taking into account the reasonableness and the implementation complexity,
- // here use the CTE definition first, check table name only and ignore database name
- cteRelations.get(u.tableIdentifier.last)
- .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
- .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
+ catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}
- def apply(plan: LogicalPlan): LogicalPlan = {
- val (realPlan, cteRelations) = plan match {
- // TODO allow subquery to define CTE
- // Add cte table to a temp relation map,drop `with` plan and keep its child
- case With(child, relations) => (child, relations)
- case other => (other, Map.empty[String, LogicalPlan])
- }
-
- realPlan transform {
- case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
- i.copy(
- table = EliminateSubQueries(getTable(u, cteRelations)))
- case u: UnresolvedRelation =>
- getTable(u, cteRelations)
- }
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
+ i.copy(table = EliminateSubQueries(getTable(u)))
+ case u: UnresolvedRelation =>
+ getTable(u)
}
}
@@ -664,21 +701,6 @@ class Analyzer(
// We have to use transformDown at here to make sure the rule of
// "Aggregate with Having clause" will be triggered.
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
- // Lookup WindowSpecDefinitions. This rule works with unresolved children.
- case WithWindowDefinition(windowDefinitions, child) =>
- child.transform {
- case plan => plan.transformExpressions {
- case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
- val errorMessage =
- s"Window specification $windowName is not defined in the WINDOW clause."
- val windowSpecDefinition =
- windowDefinitions
- .get(windowName)
- .getOrElse(failAnalysis(errorMessage))
- WindowExpression(c, windowSpecDefinition)
- }
- }
-
// Aggregate with Having clause. This rule works with an unresolved Aggregate because
// a resolved Aggregate will not have Window Functions.
case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))