From 2f53588738e95a2191f9844818e47f0d2ebbfd54 Mon Sep 17 00:00:00 2001 From: haiyang Date: Sat, 11 Apr 2015 18:30:17 -0700 Subject: [SPARK-6199] [SQL] Support CTE in HiveContext and SQLContext Author: haiyang Closes #4929 from haiyangsea/cte and squashes the following commits: 220b67d [haiyang] add golden files for cte test d3c7681 [haiyang] Merge branch 'master' into cte-repair 0ba2070 [haiyang] modify code style 9ce6b58 [haiyang] fix conflict ff74741 [haiyang] add comment for With plan 0d56af4 [haiyang] code indention 776a440 [haiyang] add comments for resolve relation strategy 2fccd7e [haiyang] add comments for resolve relation strategy 241bbe2 [haiyang] fix cte problem of view e9e1237 [haiyang] fix test case problem 614182f [haiyang] add test cases for CTE feature 32e415b [haiyang] add comment 1cc8c15 [haiyang] support with 03f1097 [haiyang] support with e960099 [haiyang] support with 9aaa874 [haiyang] support with 0566978 [haiyang] support with a99ecd2 [haiyang] support with c3fa4c2 [haiyang] support with 3b6077f [haiyang] support with 5f8abe3 [haiyang] support with 4572b05 [haiyang] support with f801f54 [haiyang] support with --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 7 +++++ .../spark/sql/catalyst/analysis/Analyzer.scala | 31 ++++++++++++++++------ .../catalyst/plans/logical/basicOperators.scala | 12 +++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 ++++++++++ .../scala/org/apache/spark/sql/hive/HiveQl.scala | 27 ++++++++++++++----- ...E feature #1-0-eedabbfe6ba8799f7b7782fb47a82768 | 3 +++ ...E feature #2-0-aa03d104251f97e36bc52279cb9931c9 | 4 +++ ...E feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2 | 1 + .../spark/sql/hive/execution/HiveQuerySuite.scala | 15 +++++++++++ 9 files changed, 100 insertions(+), 14 deletions(-) create mode 100644 sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768 create mode 100644 sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9 create mode 100644 sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2 (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 89f4a19add..ee04cb579d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -111,6 +111,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { protected val UPPER = Keyword("UPPER") protected val WHEN = Keyword("WHEN") protected val WHERE = Keyword("WHERE") + protected val WITH = Keyword("WITH") protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = { exprs.zipWithIndex.map { @@ -127,6 +128,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert + | cte ) protected lazy val select: Parser[LogicalPlan] = @@ -156,6 +158,11 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o) } + protected lazy val cte: Parser[LogicalPlan] = + WITH ~> rep1sep(ident ~ ( AS ~ "(" ~> start <~ ")"), ",") ~ start ^^ { + case r ~ s => With(s, r.map({case n ~ s => (n, Subquery(n, s))}).toMap) + } + protected lazy val projection: Parser[Expression] = expression ~ (AS.? ~> ident.?) ^^ { case e ~ a => a.fold(e)(Alias(e, _)()) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 524c73c31b..b83f18abdd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -169,21 +169,36 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { - def getTable(u: UnresolvedRelation): LogicalPlan = { + def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]) = { try { - catalog.lookupRelation(u.tableIdentifier, u.alias) + // In hive, if there is same table name in database and CTE definition, + // hive will use the table in database, not the CTE one. + // Taking into account the reasonableness and the implementation complexity, + // here use the CTE definition first, check table name only and ignore database name + cteRelations.get(u.tableIdentifier.last) + .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation)) + .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias)) } catch { case _: NoSuchTableException => u.failAnalysis(s"no such table ${u.tableName}") } } - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) => - i.copy( - table = EliminateSubQueries(getTable(u))) - case u: UnresolvedRelation => - getTable(u) + def apply(plan: LogicalPlan): LogicalPlan = { + val (realPlan, cteRelations) = plan match { + // TODO allow subquery to define CTE + // Add cte table to a temp relation map,drop `with` plan and keep its child + case With(child, relations) => (child, relations) + case other => (other, Map.empty[String, LogicalPlan]) + } + + realPlan transform { + case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) => + i.copy( + table = EliminateSubQueries(getTable(u, cteRelations))) + case u: UnresolvedRelation => + getTable(u, cteRelations) + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 8633e06093..3bd5aa5964 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -147,6 +147,18 @@ case class CreateTableAsSelect[T]( override lazy val resolved: Boolean = databaseName != None && childrenResolved } +/** + * A container for holding named common table expressions (CTEs) and a query plan. + * This operator will be removed during analysis and the relations will be substituted into child. + * @param child The final query of this CTE. + * @param cteRelations Queries that this CTE defined, + * key is the alias of the CTE definition, + * value is the CTE definition. + */ +case class With(child: LogicalPlan, cteRelations: Map[String, Subquery]) extends UnaryNode { + override def output = child.output +} + case class WriteToFile( path: String, child: LogicalPlan) extends UnaryNode { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1392b48191..fb8fc6dbd1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -407,6 +407,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { mapData.collect().take(1).map(Row.fromTuple).toSeq) } + test("CTE feature") { + checkAnswer( + sql("with q1 as (select * from testData limit 10) select * from q1"), + testData.take(10).toSeq) + + checkAnswer( + sql(""" + |with q1 as (select * from testData where key= '5'), + |q2 as (select * from testData where key = '4') + |select * from q1 union all select * from q2""".stripMargin), + Row(5, "5") :: Row(4, "4") :: Nil) + + } + test("date row") { checkAnswer(sql( """select cast("2015-01-28" as date) from testData limit 1"""), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 0bdaf5f7ef..2fb2e7c4a5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -576,11 +576,23 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token("TOK_QUERY", queryArgs) if Seq("TOK_FROM", "TOK_INSERT").contains(queryArgs.head.getText) => - val (fromClause: Option[ASTNode], insertClauses) = queryArgs match { - case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses => - (Some(args.head), insertClauses) - case Token("TOK_INSERT", _) :: Nil => (None, queryArgs) - } + val (fromClause: Option[ASTNode], insertClauses, cteRelations) = + queryArgs match { + case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses => + // check if has CTE + insertClauses.last match { + case Token("TOK_CTE", cteClauses) => + val cteRelations = cteClauses.map(node => { + val relation = nodeToRelation(node).asInstanceOf[Subquery] + (relation.alias, relation) + }).toMap + (Some(args.head), insertClauses.init, Some(cteRelations)) + + case _ => (Some(args.head), insertClauses, None) + } + + case Token("TOK_INSERT", _) :: Nil => (None, queryArgs, None) + } // Return one query for each insert clause. val queries = insertClauses.map { case Token("TOK_INSERT", singleInsert) => @@ -794,7 +806,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } // If there are multiple INSERTS just UNION them together into on query. - queries.reduceLeft(Union) + val query = queries.reduceLeft(Union) + + // return With plan if there is CTE + cteRelations.map(With(query, _)).getOrElse(query) case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right)) diff --git a/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768 b/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768 new file mode 100644 index 0000000000..f6ba75da25 --- /dev/null +++ b/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768 @@ -0,0 +1,3 @@ +5 +5 +5 diff --git a/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9 b/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9 new file mode 100644 index 0000000000..ca7b591095 --- /dev/null +++ b/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9 @@ -0,0 +1,4 @@ +val_4 +val_5 +val_5 +val_5 diff --git a/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2 b/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2 new file mode 100644 index 0000000000..b8626c4cff --- /dev/null +++ b/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2 @@ -0,0 +1 @@ +4 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index af781a502e..1222fbabd8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -542,6 +542,21 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { createQueryTest("select null from table", "SELECT null FROM src LIMIT 1") + createQueryTest("CTE feature #1", + "with q1 as (select key from src) select * from q1 where key = 5") + + createQueryTest("CTE feature #2", + """with q1 as (select * from src where key= 5), + |q2 as (select * from src s2 where key = 4) + |select value from q1 union all select value from q2 + """.stripMargin) + + createQueryTest("CTE feature #3", + """with q1 as (select key from src) + |from q1 + |select * where key = 4 + """.stripMargin) + test("predicates contains an empty AttributeSet() references") { sql( """ -- cgit v1.2.3