aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala31
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala12
3 files changed, 42 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 89f4a19add..ee04cb579d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -111,6 +111,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected val UPPER = Keyword("UPPER")
protected val WHEN = Keyword("WHEN")
protected val WHERE = Keyword("WHERE")
+ protected val WITH = Keyword("WITH")
protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
@@ -127,6 +128,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
| UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
| insert
+ | cte
)
protected lazy val select: Parser[LogicalPlan] =
@@ -156,6 +158,11 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
}
+ protected lazy val cte: Parser[LogicalPlan] =
+ WITH ~> rep1sep(ident ~ ( AS ~ "(" ~> start <~ ")"), ",") ~ start ^^ {
+ case r ~ s => With(s, r.map({case n ~ s => (n, Subquery(n, s))}).toMap)
+ }
+
protected lazy val projection: Parser[Expression] =
expression ~ (AS.? ~> ident.?) ^^ {
case e ~ a => a.fold(e)(Alias(e, _)())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 524c73c31b..b83f18abdd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -169,21 +169,36 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
- def getTable(u: UnresolvedRelation): LogicalPlan = {
+ def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]) = {
try {
- catalog.lookupRelation(u.tableIdentifier, u.alias)
+ // In hive, if there is same table name in database and CTE definition,
+ // hive will use the table in database, not the CTE one.
+ // Taking into account the reasonableness and the implementation complexity,
+ // here use the CTE definition first, check table name only and ignore database name
+ cteRelations.get(u.tableIdentifier.last)
+ .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
+ .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
- i.copy(
- table = EliminateSubQueries(getTable(u)))
- case u: UnresolvedRelation =>
- getTable(u)
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ val (realPlan, cteRelations) = plan match {
+ // TODO allow subquery to define CTE
+ // Add cte table to a temp relation map,drop `with` plan and keep its child
+ case With(child, relations) => (child, relations)
+ case other => (other, Map.empty[String, LogicalPlan])
+ }
+
+ realPlan transform {
+ case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
+ i.copy(
+ table = EliminateSubQueries(getTable(u, cteRelations)))
+ case u: UnresolvedRelation =>
+ getTable(u, cteRelations)
+ }
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 8633e06093..3bd5aa5964 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -147,6 +147,18 @@ case class CreateTableAsSelect[T](
override lazy val resolved: Boolean = databaseName != None && childrenResolved
}
+/**
+ * A container for holding named common table expressions (CTEs) and a query plan.
+ * This operator will be removed during analysis and the relations will be substituted into child.
+ * @param child The final query of this CTE.
+ * @param cteRelations Queries that this CTE defined,
+ * key is the alias of the CTE definition,
+ * value is the CTE definition.
+ */
+case class With(child: LogicalPlan, cteRelations: Map[String, Subquery]) extends UnaryNode {
+ override def output = child.output
+}
+
case class WriteToFile(
path: String,
child: LogicalPlan) extends UnaryNode {