aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main
diff options
context:
space:
mode:
authorhaiyang <huhaiyang@huawei.com>2015-04-11 18:30:17 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-11 18:30:17 -0700
commit2f53588738e95a2191f9844818e47f0d2ebbfd54 (patch)
tree2cdb58ec4161ee4d5d2e191ea10d8653c6878a8d /sql/catalyst/src/main
parent7dbd37160ff57f80cc7abdcaef95f8c6df20a0f0 (diff)
downloadspark-2f53588738e95a2191f9844818e47f0d2ebbfd54.tar.gz
spark-2f53588738e95a2191f9844818e47f0d2ebbfd54.tar.bz2
spark-2f53588738e95a2191f9844818e47f0d2ebbfd54.zip
[SPARK-6199] [SQL] Support CTE in HiveContext and SQLContext
Author: haiyang <huhaiyang@huawei.com> Closes #4929 from haiyangsea/cte and squashes the following commits: 220b67d [haiyang] add golden files for cte test d3c7681 [haiyang] Merge branch 'master' into cte-repair 0ba2070 [haiyang] modify code style 9ce6b58 [haiyang] fix conflict ff74741 [haiyang] add comment for With plan 0d56af4 [haiyang] code indention 776a440 [haiyang] add comments for resolve relation strategy 2fccd7e [haiyang] add comments for resolve relation strategy 241bbe2 [haiyang] fix cte problem of view e9e1237 [haiyang] fix test case problem 614182f [haiyang] add test cases for CTE feature 32e415b [haiyang] add comment 1cc8c15 [haiyang] support with 03f1097 [haiyang] support with e960099 [haiyang] support with 9aaa874 [haiyang] support with 0566978 [haiyang] support with a99ecd2 [haiyang] support with c3fa4c2 [haiyang] support with 3b6077f [haiyang] support with 5f8abe3 [haiyang] support with 4572b05 [haiyang] support with f801f54 [haiyang] support with
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala31
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala12
3 files changed, 42 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 89f4a19add..ee04cb579d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -111,6 +111,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected val UPPER = Keyword("UPPER")
protected val WHEN = Keyword("WHEN")
protected val WHERE = Keyword("WHERE")
+ protected val WITH = Keyword("WITH")
protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
@@ -127,6 +128,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
| UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
| insert
+ | cte
)
protected lazy val select: Parser[LogicalPlan] =
@@ -156,6 +158,11 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
}
+ protected lazy val cte: Parser[LogicalPlan] =
+ WITH ~> rep1sep(ident ~ ( AS ~ "(" ~> start <~ ")"), ",") ~ start ^^ {
+ case r ~ s => With(s, r.map({case n ~ s => (n, Subquery(n, s))}).toMap)
+ }
+
protected lazy val projection: Parser[Expression] =
expression ~ (AS.? ~> ident.?) ^^ {
case e ~ a => a.fold(e)(Alias(e, _)())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 524c73c31b..b83f18abdd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -169,21 +169,36 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
- def getTable(u: UnresolvedRelation): LogicalPlan = {
+ def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]) = {
try {
- catalog.lookupRelation(u.tableIdentifier, u.alias)
+ // In hive, if there is same table name in database and CTE definition,
+ // hive will use the table in database, not the CTE one.
+ // Taking into account the reasonableness and the implementation complexity,
+ // here use the CTE definition first, check table name only and ignore database name
+ cteRelations.get(u.tableIdentifier.last)
+ .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
+ .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
- i.copy(
- table = EliminateSubQueries(getTable(u)))
- case u: UnresolvedRelation =>
- getTable(u)
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ val (realPlan, cteRelations) = plan match {
+ // TODO allow subquery to define CTE
+ // Add cte table to a temp relation map,drop `with` plan and keep its child
+ case With(child, relations) => (child, relations)
+ case other => (other, Map.empty[String, LogicalPlan])
+ }
+
+ realPlan transform {
+ case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
+ i.copy(
+ table = EliminateSubQueries(getTable(u, cteRelations)))
+ case u: UnresolvedRelation =>
+ getTable(u, cteRelations)
+ }
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 8633e06093..3bd5aa5964 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -147,6 +147,18 @@ case class CreateTableAsSelect[T](
override lazy val resolved: Boolean = databaseName != None && childrenResolved
}
+/**
+ * A container for holding named common table expressions (CTEs) and a query plan.
+ * This operator will be removed during analysis and the relations will be substituted into child.
+ * @param child The final query of this CTE.
+ * @param cteRelations Queries that this CTE defined,
+ * key is the alias of the CTE definition,
+ * value is the CTE definition.
+ */
+case class With(child: LogicalPlan, cteRelations: Map[String, Subquery]) extends UnaryNode {
+ override def output = child.output
+}
+
case class WriteToFile(
path: String,
child: LogicalPlan) extends UnaryNode {