aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhaiyang <huhaiyang@huawei.com>2015-04-11 18:30:17 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-11 18:30:17 -0700
commit2f53588738e95a2191f9844818e47f0d2ebbfd54 (patch)
tree2cdb58ec4161ee4d5d2e191ea10d8653c6878a8d /sql
parent7dbd37160ff57f80cc7abdcaef95f8c6df20a0f0 (diff)
downloadspark-2f53588738e95a2191f9844818e47f0d2ebbfd54.tar.gz
spark-2f53588738e95a2191f9844818e47f0d2ebbfd54.tar.bz2
spark-2f53588738e95a2191f9844818e47f0d2ebbfd54.zip
[SPARK-6199] [SQL] Support CTE in HiveContext and SQLContext
Author: haiyang <huhaiyang@huawei.com> Closes #4929 from haiyangsea/cte and squashes the following commits: 220b67d [haiyang] add golden files for cte test d3c7681 [haiyang] Merge branch 'master' into cte-repair 0ba2070 [haiyang] modify code style 9ce6b58 [haiyang] fix conflict ff74741 [haiyang] add comment for With plan 0d56af4 [haiyang] code indention 776a440 [haiyang] add comments for resolve relation strategy 2fccd7e [haiyang] add comments for resolve relation strategy 241bbe2 [haiyang] fix cte problem of view e9e1237 [haiyang] fix test case problem 614182f [haiyang] add test cases for CTE feature 32e415b [haiyang] add comment 1cc8c15 [haiyang] support with 03f1097 [haiyang] support with e960099 [haiyang] support with 9aaa874 [haiyang] support with 0566978 [haiyang] support with a99ecd2 [haiyang] support with c3fa4c2 [haiyang] support with 3b6077f [haiyang] support with 5f8abe3 [haiyang] support with 4572b05 [haiyang] support with f801f54 [haiyang] support with
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala31
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala27
-rw-r--r--sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a827683
-rw-r--r--sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c94
-rw-r--r--sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f21
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala15
9 files changed, 100 insertions, 14 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 89f4a19add..ee04cb579d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -111,6 +111,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected val UPPER = Keyword("UPPER")
protected val WHEN = Keyword("WHEN")
protected val WHERE = Keyword("WHERE")
+ protected val WITH = Keyword("WITH")
protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
@@ -127,6 +128,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
| UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
| insert
+ | cte
)
protected lazy val select: Parser[LogicalPlan] =
@@ -156,6 +158,11 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
}
+ protected lazy val cte: Parser[LogicalPlan] =
+ WITH ~> rep1sep(ident ~ ( AS ~ "(" ~> start <~ ")"), ",") ~ start ^^ {
+ case r ~ s => With(s, r.map({case n ~ s => (n, Subquery(n, s))}).toMap)
+ }
+
protected lazy val projection: Parser[Expression] =
expression ~ (AS.? ~> ident.?) ^^ {
case e ~ a => a.fold(e)(Alias(e, _)())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 524c73c31b..b83f18abdd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -169,21 +169,36 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
- def getTable(u: UnresolvedRelation): LogicalPlan = {
+ def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]) = {
try {
- catalog.lookupRelation(u.tableIdentifier, u.alias)
+ // In hive, if there is same table name in database and CTE definition,
+ // hive will use the table in database, not the CTE one.
+ // Taking into account the reasonableness and the implementation complexity,
+ // here use the CTE definition first, check table name only and ignore database name
+ cteRelations.get(u.tableIdentifier.last)
+ .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
+ .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
- i.copy(
- table = EliminateSubQueries(getTable(u)))
- case u: UnresolvedRelation =>
- getTable(u)
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ val (realPlan, cteRelations) = plan match {
+ // TODO allow subquery to define CTE
+ // Add cte table to a temp relation map,drop `with` plan and keep its child
+ case With(child, relations) => (child, relations)
+ case other => (other, Map.empty[String, LogicalPlan])
+ }
+
+ realPlan transform {
+ case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
+ i.copy(
+ table = EliminateSubQueries(getTable(u, cteRelations)))
+ case u: UnresolvedRelation =>
+ getTable(u, cteRelations)
+ }
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 8633e06093..3bd5aa5964 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -147,6 +147,18 @@ case class CreateTableAsSelect[T](
override lazy val resolved: Boolean = databaseName != None && childrenResolved
}
+/**
+ * A container for holding named common table expressions (CTEs) and a query plan.
+ * This operator will be removed during analysis and the relations will be substituted into child.
+ * @param child The final query of this CTE.
+ * @param cteRelations Queries that this CTE defined,
+ * key is the alias of the CTE definition,
+ * value is the CTE definition.
+ */
+case class With(child: LogicalPlan, cteRelations: Map[String, Subquery]) extends UnaryNode {
+ override def output = child.output
+}
+
case class WriteToFile(
path: String,
child: LogicalPlan) extends UnaryNode {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1392b48191..fb8fc6dbd1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -407,6 +407,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
mapData.collect().take(1).map(Row.fromTuple).toSeq)
}
+ test("CTE feature") {
+ checkAnswer(
+ sql("with q1 as (select * from testData limit 10) select * from q1"),
+ testData.take(10).toSeq)
+
+ checkAnswer(
+ sql("""
+ |with q1 as (select * from testData where key= '5'),
+ |q2 as (select * from testData where key = '4')
+ |select * from q1 union all select * from q2""".stripMargin),
+ Row(5, "5") :: Row(4, "4") :: Nil)
+
+ }
+
test("date row") {
checkAnswer(sql(
"""select cast("2015-01-28" as date) from testData limit 1"""),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 0bdaf5f7ef..2fb2e7c4a5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -576,11 +576,23 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case Token("TOK_QUERY", queryArgs)
if Seq("TOK_FROM", "TOK_INSERT").contains(queryArgs.head.getText) =>
- val (fromClause: Option[ASTNode], insertClauses) = queryArgs match {
- case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
- (Some(args.head), insertClauses)
- case Token("TOK_INSERT", _) :: Nil => (None, queryArgs)
- }
+ val (fromClause: Option[ASTNode], insertClauses, cteRelations) =
+ queryArgs match {
+ case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
+ // check if has CTE
+ insertClauses.last match {
+ case Token("TOK_CTE", cteClauses) =>
+ val cteRelations = cteClauses.map(node => {
+ val relation = nodeToRelation(node).asInstanceOf[Subquery]
+ (relation.alias, relation)
+ }).toMap
+ (Some(args.head), insertClauses.init, Some(cteRelations))
+
+ case _ => (Some(args.head), insertClauses, None)
+ }
+
+ case Token("TOK_INSERT", _) :: Nil => (None, queryArgs, None)
+ }
// Return one query for each insert clause.
val queries = insertClauses.map { case Token("TOK_INSERT", singleInsert) =>
@@ -794,7 +806,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}
// If there are multiple INSERTS just UNION them together into on query.
- queries.reduceLeft(Union)
+ val query = queries.reduceLeft(Union)
+
+ // return With plan if there is CTE
+ cteRelations.map(With(query, _)).getOrElse(query)
case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right))
diff --git a/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768 b/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768
new file mode 100644
index 0000000000..f6ba75da25
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768
@@ -0,0 +1,3 @@
+5
+5
+5
diff --git a/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9 b/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9
new file mode 100644
index 0000000000..ca7b591095
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9
@@ -0,0 +1,4 @@
+val_4
+val_5
+val_5
+val_5
diff --git a/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2 b/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2
new file mode 100644
index 0000000000..b8626c4cff
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2
@@ -0,0 +1 @@
+4
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index af781a502e..1222fbabd8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -542,6 +542,21 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
createQueryTest("select null from table",
"SELECT null FROM src LIMIT 1")
+ createQueryTest("CTE feature #1",
+ "with q1 as (select key from src) select * from q1 where key = 5")
+
+ createQueryTest("CTE feature #2",
+ """with q1 as (select * from src where key= 5),
+ |q2 as (select * from src s2 where key = 4)
+ |select value from q1 union all select value from q2
+ """.stripMargin)
+
+ createQueryTest("CTE feature #3",
+ """with q1 as (select key from src)
+ |from q1
+ |select * where key = 4
+ """.stripMargin)
+
test("predicates contains an empty AttributeSet() references") {
sql(
"""