aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2015-05-07 22:55:42 -0700
committerYin Huai <yhuai@databricks.com>2015-05-07 22:55:42 -0700
commitf496bf3c539a873ffdf3aa803847ef7b50135bd7 (patch)
tree0d7b2762e99612dc0371505d40638d5c65bf2cf3 /sql/catalyst
parent714db2ef52c0fe34418e252e5a6f220337022046 (diff)
downloadspark-f496bf3c539a873ffdf3aa803847ef7b50135bd7.tar.gz
spark-f496bf3c539a873ffdf3aa803847ef7b50135bd7.tar.bz2
spark-f496bf3c539a873ffdf3aa803847ef7b50135bd7.zip
[SPARK-7232] [SQL] Add a Substitution batch for spark sql analyzer
Added a new batch named `Substitution` before `Resolution` batch. The motivation for this is there are kind of cases we want to do some substitution on the parsed logical plan before resolve it. Consider this two cases: 1 CTE, for cte we first build a row logical plan ``` 'With Map(q1 -> 'Subquery q1 'Project ['key] 'UnresolvedRelation [src], None) 'Project [*] 'Filter ('key = 5) 'UnresolvedRelation [q1], None ``` In `With` logicalplan here is a map stored the (`q1-> subquery`), we want first take off the with command and substitute the `q1` of `UnresolvedRelation` by the `subquery` 2 Another example is Window function, in window function user may define some windows, we also need substitute the window name of child by the concrete window. this should also done in the Substitution batch. Author: wangfei <wangfei1@huawei.com> Closes #5776 from scwf/addbatch and squashes the following commits: d4b962f [wangfei] added WindowsSubstitution 70f6932 [wangfei] Merge branch 'master' of https://github.com/apache/spark into addbatch ecaeafb [wangfei] address yhuai's comments 553005a [wangfei] fix test case 0c54798 [wangfei] address comments 29aaaaf [wangfei] fix compile 1c9a092 [wangfei] added Substitution bastch
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala98
1 files changed, 60 insertions, 38 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7e46ad851c..bb7913e186 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -55,6 +55,10 @@ class Analyzer(
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil
lazy val batches: Seq[Batch] = Seq(
+ Batch("Substitution", fixedPoint,
+ CTESubstitution ::
+ WindowsSubstitution ::
+ Nil : _*),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
@@ -72,6 +76,55 @@ class Analyzer(
)
/**
+ * Substitute child plan with cte definitions
+ */
+ object CTESubstitution extends Rule[LogicalPlan] {
+ // TODO allow subquery to define CTE
+ def apply(plan: LogicalPlan): LogicalPlan = plan match {
+ case With(child, relations) => substituteCTE(child, relations)
+ case other => other
+ }
+
+ def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
+ plan transform {
+ // In hive, if there is same table name in database and CTE definition,
+ // hive will use the table in database, not the CTE one.
+ // Taking into account the reasonableness and the implementation complexity,
+ // here use the CTE definition first, check table name only and ignore database name
+ // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
+ case u : UnresolvedRelation =>
+ val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
+ val withAlias = u.alias.map(Subquery(_, relation))
+ withAlias.getOrElse(relation)
+ }
+ substituted.getOrElse(u)
+ }
+ }
+ }
+
+ /**
+ * Substitute child plan with WindowSpecDefinitions.
+ */
+ object WindowsSubstitution extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ // Lookup WindowSpecDefinitions. This rule works with unresolved children.
+ case WithWindowDefinition(windowDefinitions, child) =>
+ child.transform {
+ case plan => plan.transformExpressions {
+ case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
+ val errorMessage =
+ s"Window specification $windowName is not defined in the WINDOW clause."
+ val windowSpecDefinition =
+ windowDefinitions
+ .get(windowName)
+ .getOrElse(failAnalysis(errorMessage))
+ WindowExpression(c, windowSpecDefinition)
+ }
+ }
+ }
+ }
+
+ /**
* Removes no-op Alias expressions from the plan.
*/
object TrimGroupingAliases extends Rule[LogicalPlan] {
@@ -172,36 +225,20 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
- def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
+ def getTable(u: UnresolvedRelation): LogicalPlan = {
try {
- // In hive, if there is same table name in database and CTE definition,
- // hive will use the table in database, not the CTE one.
- // Taking into account the reasonableness and the implementation complexity,
- // here use the CTE definition first, check table name only and ignore database name
- cteRelations.get(u.tableIdentifier.last)
- .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
- .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
+ catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}
- def apply(plan: LogicalPlan): LogicalPlan = {
- val (realPlan, cteRelations) = plan match {
- // TODO allow subquery to define CTE
- // Add cte table to a temp relation map,drop `with` plan and keep its child
- case With(child, relations) => (child, relations)
- case other => (other, Map.empty[String, LogicalPlan])
- }
-
- realPlan transform {
- case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
- i.copy(
- table = EliminateSubQueries(getTable(u, cteRelations)))
- case u: UnresolvedRelation =>
- getTable(u, cteRelations)
- }
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
+ i.copy(table = EliminateSubQueries(getTable(u)))
+ case u: UnresolvedRelation =>
+ getTable(u)
}
}
@@ -664,21 +701,6 @@ class Analyzer(
// We have to use transformDown at here to make sure the rule of
// "Aggregate with Having clause" will be triggered.
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
- // Lookup WindowSpecDefinitions. This rule works with unresolved children.
- case WithWindowDefinition(windowDefinitions, child) =>
- child.transform {
- case plan => plan.transformExpressions {
- case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
- val errorMessage =
- s"Window specification $windowName is not defined in the WINDOW clause."
- val windowSpecDefinition =
- windowDefinitions
- .get(windowName)
- .getOrElse(failAnalysis(errorMessage))
- WindowExpression(c, windowSpecDefinition)
- }
- }
-
// Aggregate with Having clause. This rule works with an unresolved Aggregate because
// a resolved Aggregate will not have Window Functions.
case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))