aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-03-11 13:22:34 +0800
committerWenchen Fan <wenchen@databricks.com>2016-03-11 13:22:34 +0800
commit6871cc8f3eb54ad674dea89645599c92a8b94415 (patch)
treed0d965b0170145b68270c03d4933ac6471a2cc95 /sql/hive
parent560489f4e16ff18b5e66e7de1bb84d890369a462 (diff)
downloadspark-6871cc8f3eb54ad674dea89645599c92a8b94415.tar.gz
spark-6871cc8f3eb54ad674dea89645599c92a8b94415.tar.bz2
spark-6871cc8f3eb54ad674dea89645599c92a8b94415.zip
[SPARK-12718][SPARK-13720][SQL] SQL generation support for window functions
## What changes were proposed in this pull request? Add SQL generation support for window functions. The idea is simple, just treat `Window` operator like `Project`, i.e. add subquery to its child when necessary, generate a `SELECT ... FROM ...` SQL string, implement `sql` method for window related expressions, e.g. `WindowSpecDefinition`, `WindowFrame`, etc. This PR also fixed SPARK-13720 by improving the process of adding extra `SubqueryAlias`(the `RecoverScopingInfo` rule). Before this PR, we update the qualifiers in project list while adding the subquery. However, this is incomplete as we need to update qualifiers in all ancestors that refer attributes here. In this PR, we split `RecoverScopingInfo` into 2 rules: `AddSubQuery` and `UpdateQualifier`. `AddSubQuery` only add subquery if necessary, and `UpdateQualifier` will re-propagate and update qualifiers bottom up. Ideally we should put the bug fix part in an individual PR, but this bug also blocks the window stuff, so I put them together here. Many thanks to gatorsmile for the initial discussion and test cases! ## How was this patch tested? new tests in `LogicalPlanToSQLSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #11555 from cloud-fan/window.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala104
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala114
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala1
3 files changed, 189 insertions, 30 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 683f738054..bf12982da7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -42,7 +42,7 @@ case class SubqueryHolder(query: String) extends LeafExpression with Unevaluable
}
/**
- * A builder class used to convert a resolved logical plan into a SQL query string. Note that this
+ * A builder class used to convert a resolved logical plan into a SQL query string. Note that not
* all resolved logical plan are convertible. They either don't have corresponding SQL
* representations (e.g. logical plans that operate on local Scala collections), or are simply not
* supported by this builder (yet).
@@ -103,6 +103,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case p: Aggregate =>
aggregateToSQL(p)
+ case w: Window =>
+ windowToSQL(w)
+
case Limit(limitExpr, child) =>
s"${toSQL(child)} LIMIT ${limitExpr.sql}"
@@ -123,12 +126,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
build(toSQL(p.child), "TABLESAMPLE(" + fraction + " PERCENT)")
}
- case p: Filter =>
- val whereOrHaving = p.child match {
+ case Filter(condition, child) =>
+ val whereOrHaving = child match {
case _: Aggregate => "HAVING"
case _ => "WHERE"
}
- build(toSQL(p.child), whereOrHaving, p.condition.sql)
+ build(toSQL(child), whereOrHaving, condition.sql)
case p @ Distinct(u: Union) if u.children.length > 1 =>
val childrenSql = u.children.map(c => s"(${toSQL(c)})")
@@ -179,7 +182,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
build(
toSQL(p.child),
if (p.global) "ORDER BY" else "SORT BY",
- p.order.map { case SortOrder(e, dir) => s"${e.sql} ${dir.sql}" }.mkString(", ")
+ p.order.map(_.sql).mkString(", ")
)
case p: RepartitionByExpression =>
@@ -268,9 +271,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case attr: Attribute if groupByAttrMap.contains(attr) => groupByAttrMap(attr)
}
}
- val groupingSetSQL =
- "GROUPING SETS(" +
- groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")"
+ val groupingSetSQL = "GROUPING SETS(" +
+ groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")"
val aggExprs = agg.aggregateExpressions.map { case expr =>
expr.transformDown {
@@ -297,22 +299,50 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}
+ private def windowToSQL(w: Window): String = {
+ build(
+ "SELECT",
+ (w.child.output ++ w.windowExpressions).map(_.sql).mkString(", "),
+ if (w.child == OneRowRelation) "" else "FROM",
+ toSQL(w.child)
+ )
+ }
+
object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
- Batch("Canonicalizer", FixedPoint(100),
+ Batch("Collapse Project", FixedPoint(100),
// The `WidenSetOperationTypes` analysis rule may introduce extra `Project`s over
// `Aggregate`s to perform type casting. This rule merges these `Project`s into
// `Aggregate`s.
- CollapseProject,
-
+ CollapseProject),
+ Batch("Recover Scoping Info", Once,
// Used to handle other auxiliary `Project`s added by analyzer (e.g.
// `ResolveAggregateFunctions` rule)
- RecoverScopingInfo
+ AddSubquery,
+ // Previous rule will add extra sub-queries, this rule is used to re-propagate and update
+ // the qualifiers bottom up, e.g.:
+ //
+ // Sort
+ // ordering = t1.a
+ // Project
+ // projectList = [t1.a, t1.b]
+ // Subquery gen_subquery
+ // child ...
+ //
+ // will be transformed to:
+ //
+ // Sort
+ // ordering = gen_subquery.a
+ // Project
+ // projectList = [gen_subquery.a, gen_subquery.b]
+ // Subquery gen_subquery
+ // child ...
+ UpdateQualifiers
)
)
- object RecoverScopingInfo extends Rule[LogicalPlan] {
- override def apply(tree: LogicalPlan): LogicalPlan = tree transform {
+ object AddSubquery extends Rule[LogicalPlan] {
+ override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
// This branch handles aggregate functions within HAVING clauses. For example:
//
// SELECT key FROM src GROUP BY key HAVING max(value) > "val_255"
@@ -324,8 +354,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
// +- Filter ...
// +- Aggregate ...
// +- MetastoreRelation default, src, None
- case plan @ Project(_, Filter(_, _: Aggregate)) =>
- wrapChildWithSubquery(plan)
+ case plan @ Project(_, Filter(_, _: Aggregate)) => wrapChildWithSubquery(plan)
+
+ case w @ Window(_, _, _, _, Filter(_, _: Aggregate)) => wrapChildWithSubquery(w)
case plan @ Project(_,
_: SubqueryAlias
@@ -338,20 +369,39 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
| _: Sample
) => plan
- case plan: Project =>
- wrapChildWithSubquery(plan)
+ case plan: Project => wrapChildWithSubquery(plan)
+
+ // We will generate "SELECT ... FROM ..." for Window operator, so its child operator should
+ // be able to put in the FROM clause, or we wrap it with a subquery.
+ case w @ Window(_, _, _, _,
+ _: SubqueryAlias
+ | _: Filter
+ | _: Join
+ | _: MetastoreRelation
+ | OneRowRelation
+ | _: LocalLimit
+ | _: GlobalLimit
+ | _: Sample
+ ) => w
+
+ case w: Window => wrapChildWithSubquery(w)
}
- def wrapChildWithSubquery(project: Project): Project = project match {
- case Project(projectList, child) =>
- val alias = SQLBuilder.newSubqueryName
- val childAttributes = child.outputSet
- val aliasedProjectList = projectList.map(_.transform {
- case a: Attribute if childAttributes.contains(a) =>
- a.withQualifiers(alias :: Nil)
- }.asInstanceOf[NamedExpression])
+ private def wrapChildWithSubquery(plan: UnaryNode): LogicalPlan = {
+ val newChild = SubqueryAlias(SQLBuilder.newSubqueryName, plan.child)
+ plan.withNewChildren(Seq(newChild))
+ }
+ }
- Project(aliasedProjectList, SubqueryAlias(alias, child))
+ object UpdateQualifiers extends Rule[LogicalPlan] {
+ override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
+ case plan =>
+ val inputAttributes = plan.children.flatMap(_.output)
+ plan transformExpressions {
+ case a: AttributeReference if !plan.producedAttributes.contains(a) =>
+ val qualifier = inputAttributes.find(_ semanticEquals a).map(_.qualifiers)
+ a.withQualifiers(qualifier.getOrElse(Nil))
+ }
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index ccce9871e2..198652b355 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -67,7 +67,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|
|# Resolved query plan:
|${df.queryExecution.analyzed.treeString}
- """.stripMargin)
+ """.stripMargin, e)
}
try {
@@ -84,8 +84,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|
|# Resolved query plan:
|${df.queryExecution.analyzed.treeString}
- """.stripMargin,
- cause)
+ """.stripMargin, cause)
}
}
@@ -331,6 +330,10 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkHiveQl("SELECT id FROM parquet_t0 DISTRIBUTE BY id SORT BY id")
}
+ test("SPARK-13720: sort by after having") {
+ checkHiveQl("SELECT COUNT(value) FROM parquet_t1 GROUP BY key HAVING MAX(key) > 0 SORT BY key")
+ }
+
test("distinct aggregation") {
checkHiveQl("SELECT COUNT(DISTINCT id) FROM parquet_t0")
}
@@ -442,4 +445,109 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
"f1", "b[0].f1", "f1", "c[foo]", "d[0]"
)
}
+
+ test("window basic") {
+ checkHiveQl("SELECT MAX(value) OVER (PARTITION BY key % 3) FROM parquet_t1")
+ checkHiveQl(
+ """
+ |SELECT key, value, ROUND(AVG(key) OVER (), 2)
+ |FROM parquet_t1 ORDER BY key
+ """.stripMargin)
+ checkHiveQl(
+ """
+ |SELECT value, MAX(key + 1) OVER (PARTITION BY key % 5 ORDER BY key % 7) AS max
+ |FROM parquet_t1
+ """.stripMargin)
+ }
+
+ test("multiple window functions in one expression") {
+ checkHiveQl(
+ """
+ |SELECT
+ | MAX(key) OVER (ORDER BY key DESC, value) / MIN(key) OVER (PARTITION BY key % 3)
+ |FROM parquet_t1
+ """.stripMargin)
+ }
+
+ test("regular expressions and window functions in one expression") {
+ checkHiveQl("SELECT MAX(key) OVER (PARTITION BY key % 3) + key FROM parquet_t1")
+ }
+
+ test("aggregate functions and window functions in one expression") {
+ checkHiveQl("SELECT MAX(c) + COUNT(a) OVER () FROM parquet_t2 GROUP BY a, b")
+ }
+
+ test("window with different window specification") {
+ checkHiveQl(
+ """
+ |SELECT key, value,
+ |DENSE_RANK() OVER (ORDER BY key, value) AS dr,
+ |MAX(value) OVER (PARTITION BY key ORDER BY key ASC) AS max
+ |FROM parquet_t1
+ """.stripMargin)
+ }
+
+ test("window with the same window specification with aggregate + having") {
+ checkHiveQl(
+ """
+ |SELECT key, value,
+ |MAX(value) OVER (PARTITION BY key % 5 ORDER BY key DESC) AS max
+ |FROM parquet_t1 GROUP BY key, value HAVING key > 5
+ """.stripMargin)
+ }
+
+ test("window with the same window specification with aggregate functions") {
+ checkHiveQl(
+ """
+ |SELECT key, value,
+ |MAX(value) OVER (PARTITION BY key % 5 ORDER BY key) AS max
+ |FROM parquet_t1 GROUP BY key, value
+ """.stripMargin)
+ }
+
+ test("window with the same window specification with aggregate") {
+ checkHiveQl(
+ """
+ |SELECT key, value,
+ |DENSE_RANK() OVER (DISTRIBUTE BY key SORT BY key, value) AS dr,
+ |COUNT(key)
+ |FROM parquet_t1 GROUP BY key, value
+ """.stripMargin)
+ }
+
+ test("window with the same window specification without aggregate and filter") {
+ checkHiveQl(
+ """
+ |SELECT key, value,
+ |DENSE_RANK() OVER (DISTRIBUTE BY key SORT BY key, value) AS dr,
+ |COUNT(key) OVER(DISTRIBUTE BY key SORT BY key, value) AS ca
+ |FROM parquet_t1
+ """.stripMargin)
+ }
+
+ test("window clause") {
+ checkHiveQl(
+ """
+ |SELECT key, MAX(value) OVER w1 AS MAX, MIN(value) OVER w2 AS min
+ |FROM parquet_t1
+ |WINDOW w1 AS (PARTITION BY key % 5 ORDER BY key), w2 AS (PARTITION BY key % 6)
+ """.stripMargin)
+ }
+
+ test("special window functions") {
+ checkHiveQl(
+ """
+ |SELECT
+ | RANK() OVER w,
+ | PERCENT_RANK() OVER w,
+ | DENSE_RANK() OVER w,
+ | ROW_NUMBER() OVER w,
+ | NTILE(10) OVER w,
+ | CUME_DIST() OVER w,
+ | LAG(key, 2) OVER w,
+ | LEAD(key, 2) OVER w
+ |FROM parquet_t1
+ |WINDOW w AS (PARTITION BY key % 5 ORDER BY key)
+ """.stripMargin)
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index afb816211e..1053246fc2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -449,6 +449,7 @@ abstract class HiveComparisonTest
|Failed to execute query using catalyst:
|Error: ${e.getMessage}
|${stackTraceToString(e)}
+ |$queryString
|$query
|== HIVE - ${hive.size} row(s) ==
|${hive.mkString("\n")}