aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-03-11 13:22:34 +0800
committerWenchen Fan <wenchen@databricks.com>2016-03-11 13:22:34 +0800
commit6871cc8f3eb54ad674dea89645599c92a8b94415 (patch)
treed0d965b0170145b68270c03d4933ac6471a2cc95 /sql/catalyst
parent560489f4e16ff18b5e66e7de1bb84d890369a462 (diff)
downloadspark-6871cc8f3eb54ad674dea89645599c92a8b94415.tar.gz
spark-6871cc8f3eb54ad674dea89645599c92a8b94415.tar.bz2
spark-6871cc8f3eb54ad674dea89645599c92a8b94415.zip
[SPARK-12718][SPARK-13720][SQL] SQL generation support for window functions
## What changes were proposed in this pull request? Add SQL generation support for window functions. The idea is simple, just treat `Window` operator like `Project`, i.e. add subquery to its child when necessary, generate a `SELECT ... FROM ...` SQL string, implement `sql` method for window related expressions, e.g. `WindowSpecDefinition`, `WindowFrame`, etc. This PR also fixed SPARK-13720 by improving the process of adding extra `SubqueryAlias`(the `RecoverScopingInfo` rule). Before this PR, we update the qualifiers in project list while adding the subquery. However, this is incomplete as we need to update qualifiers in all ancestors that refer attributes here. In this PR, we split `RecoverScopingInfo` into 2 rules: `AddSubQuery` and `UpdateQualifier`. `AddSubQuery` only add subquery if necessary, and `UpdateQualifier` will re-propagate and update qualifiers bottom up. Ideally we should put the bug fix part in an individual PR, but this bug also blocks the window stuff, so I put them together here. Many thanks to gatorsmile for the initial discussion and test cases! ## How was this patch tested? new tests in `LogicalPlanToSQLSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #11555 from cloud-fan/window.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala47
3 files changed, 47 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index b654827b8a..53ea3cfef6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1194,7 +1194,7 @@ class Analyzer(
val withWindow = addWindow(windowExpressions, withFilter)
// Finally, generate output columns according to the original projectList.
- val finalProjectList = aggregateExprs.map (_.toAttribute)
+ val finalProjectList = aggregateExprs.map(_.toAttribute)
Project(finalProjectList, withWindow)
case p: LogicalPlan if !p.childrenResolved => p
@@ -1210,7 +1210,7 @@ class Analyzer(
val withWindow = addWindow(windowExpressions, withAggregate)
// Finally, generate output columns according to the original projectList.
- val finalProjectList = aggregateExprs.map (_.toAttribute)
+ val finalProjectList = aggregateExprs.map(_.toAttribute)
Project(finalProjectList, withWindow)
// We only extract Window Expressions after all expressions of the Project
@@ -1225,7 +1225,7 @@ class Analyzer(
val withWindow = addWindow(windowExpressions, withProject)
// Finally, generate output columns according to the original projectList.
- val finalProjectList = projectList.map (_.toAttribute)
+ val finalProjectList = projectList.map(_.toAttribute)
Project(finalProjectList, withWindow)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index bd1d914872..b739361937 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -57,7 +57,8 @@ case class SortOrder(child: Expression, direction: SortDirection)
override def dataType: DataType = child.dataType
override def nullable: Boolean = child.nullable
- override def toString: String = s"$child ${if (direction == Ascending) "ASC" else "DESC"}"
+ override def toString: String = s"$child ${direction.sql}"
+ override def sql: String = child.sql + " " + direction.sql
def isAscending: Boolean = direction == Ascending
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 9e6bd0ee46..b8679474cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, NoOp}
import org.apache.spark.sql.types._
@@ -30,6 +31,7 @@ sealed trait WindowSpec
/**
* The specification for a window function.
+ *
* @param partitionSpec It defines the way that input rows are partitioned.
* @param orderSpec It defines the ordering of rows in a partition.
* @param frameSpecification It defines the window frame in a partition.
@@ -75,6 +77,22 @@ case class WindowSpecDefinition(
override def nullable: Boolean = true
override def foldable: Boolean = false
override def dataType: DataType = throw new UnsupportedOperationException
+
+ override def sql: String = {
+ val partition = if (partitionSpec.isEmpty) {
+ ""
+ } else {
+ "PARTITION BY " + partitionSpec.map(_.sql).mkString(", ")
+ }
+
+ val order = if (orderSpec.isEmpty) {
+ ""
+ } else {
+ "ORDER BY " + orderSpec.map(_.sql).mkString(", ")
+ }
+
+ s"($partition $order ${frameSpecification.toString})"
+ }
}
/**
@@ -278,6 +296,7 @@ case class WindowExpression(
override def nullable: Boolean = windowFunction.nullable
override def toString: String = s"$windowFunction $windowSpec"
+ override def sql: String = windowFunction.sql + " OVER " + windowSpec.sql
}
/**
@@ -451,6 +470,7 @@ object SizeBasedWindowFunction {
the window partition.""")
case class RowNumber() extends RowNumberLike {
override val evaluateExpression = rowNumber
+ override def sql: String = "ROW_NUMBER()"
}
/**
@@ -470,6 +490,7 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
// return the same value for equal values in the partition.
override val frame = SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
override val evaluateExpression = Divide(Cast(rowNumber, DoubleType), Cast(n, DoubleType))
+ override def sql: String = "CUME_DIST()"
}
/**
@@ -499,12 +520,25 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction {
def this() = this(Literal(1))
+ override def children: Seq[Expression] = Seq(buckets)
+
// Validate buckets. Note that this could be relaxed, the bucket value only needs to constant
// for each partition.
- buckets.eval() match {
- case b: Int if b > 0 => // Ok
- case x => throw new AnalysisException(
- "Buckets expression must be a foldable positive integer expression: $x")
+ override def checkInputDataTypes(): TypeCheckResult = {
+ if (!buckets.foldable) {
+ return TypeCheckFailure(s"Buckets expression must be foldable, but got $buckets")
+ }
+
+ if (buckets.dataType != IntegerType) {
+ return TypeCheckFailure(s"Buckets expression must be integer type, but got $buckets")
+ }
+
+ val i = buckets.eval().asInstanceOf[Int]
+ if (i > 0) {
+ TypeCheckSuccess
+ } else {
+ TypeCheckFailure(s"Buckets expression must be positive, but got: $i")
+ }
}
private val bucket = AttributeReference("bucket", IntegerType, nullable = false)()
@@ -608,6 +642,7 @@ abstract class RankLike extends AggregateWindowFunction {
case class Rank(children: Seq[Expression]) extends RankLike {
def this() = this(Nil)
override def withOrder(order: Seq[Expression]): Rank = Rank(order)
+ override def sql: String = "RANK()"
}
/**
@@ -632,6 +667,7 @@ case class DenseRank(children: Seq[Expression]) extends RankLike {
override val updateExpressions = increaseRank +: children
override val aggBufferAttributes = rank +: orderAttrs
override val initialValues = zero +: orderInit
+ override def sql: String = "DENSE_RANK()"
}
/**
@@ -658,4 +694,5 @@ case class PercentRank(children: Seq[Expression]) extends RankLike with SizeBase
override val evaluateExpression = If(GreaterThan(n, one),
Divide(Cast(Subtract(rank, one), DoubleType), Cast(Subtract(n, one), DoubleType)),
Literal(0.0d))
+ override def sql: String = "PERCENT_RANK()"
}