diff options
author | Dilip Biswal <dbiswal@us.ibm.com> | 2016-06-24 17:27:33 -0700 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-06-24 17:27:33 -0700 |
commit | 9053054c7f5ec2b9e3d8efbe6bfbfa68a6d1f0d0 (patch) | |
tree | c532c867db22e836cf477a73ef7b2394a7b705f1 | |
parent | e5d0928e2473d1838ff5420c6a8964557c33135e (diff) | |
download | spark-9053054c7f5ec2b9e3d8efbe6bfbfa68a6d1f0d0.tar.gz spark-9053054c7f5ec2b9e3d8efbe6bfbfa68a6d1f0d0.tar.bz2 spark-9053054c7f5ec2b9e3d8efbe6bfbfa68a6d1f0d0.zip |
[SPARK-16195][SQL] Allow users to specify empty over clause in window expressions through dataset API
## What changes were proposed in this pull request?
Allow to specify empty over clause in window expressions through dataset API
In SQL, its allowed to specify an empty OVER clause in the window expression.
```SQL
select area, sum(product) over () as c from windowData
where product > 3 group by area, product
having avg(month) > 0 order by avg(month), product
```
In this case the analytic function sum is presented based on all the rows of the result set
Currently its not allowed through dataset API and is handled in this PR.
## How was this patch tested?
Added a new test in DataframeWindowSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes #13897 from dilipbiswal/spark-empty-over.
3 files changed, 30 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 713f7941be..9f35107e5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types._ @@ -1094,6 +1095,22 @@ class Column(protected[sql] val expr: Expression) extends Logging { */ def over(window: expressions.WindowSpec): Column = window.withAggregate(this) + /** + * Define a empty analytic clause. In this case the analytic function is applied + * and presented for all rows in the result set. + * + * {{{ + * df.select( + * sum("price").over(), + * avg("price").over() + * ) + * }}} + * + * @group expr_ops + * @since 2.0.0 + */ + def over(): Column = over(Window.spec) + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index 350c283646..c29ec6f426 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -74,7 +74,7 @@ object Window { spec.orderBy(cols : _*) } - private def spec: WindowSpec = { + private[sql] def spec: WindowSpec = { new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala index 9a1aa46947..c6f8c3ad3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala @@ -245,6 +245,18 @@ class DataFrameWindowSuite extends QueryTest with SharedSQLContext { Seq(Row("a", 6, 9), Row("b", 9, 6))) } + test("SPARK-16195 empty over spec") { + val df = Seq(("a", 1), ("a", 1), ("a", 2), ("b", 2)). + toDF("key", "value") + df.createOrReplaceTempView("window_table") + checkAnswer( + df.select($"key", $"value", sum($"value").over(), avg($"value").over()), + Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), Row("b", 2, 6, 1.5))) + checkAnswer( + sql("select key, value, sum(value) over(), avg(value) over() from window_table"), + Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), Row("b", 2, 6, 1.5))) + } + test("window function with udaf") { val udaf = new UserDefinedAggregateFunction { def inputSchema: StructType = new StructType() |