aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Column.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala12
3 files changed, 30 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 713f7941be..9f35107e5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
+import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._
@@ -1094,6 +1095,22 @@ class Column(protected[sql] val expr: Expression) extends Logging {
*/
def over(window: expressions.WindowSpec): Column = window.withAggregate(this)
+ /**
+ * Define a empty analytic clause. In this case the analytic function is applied
+ * and presented for all rows in the result set.
+ *
+ * {{{
+ * df.select(
+ * sum("price").over(),
+ * avg("price").over()
+ * )
+ * }}}
+ *
+ * @group expr_ops
+ * @since 2.0.0
+ */
+ def over(): Column = over(Window.spec)
+
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
index 350c283646..c29ec6f426 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
@@ -74,7 +74,7 @@ object Window {
spec.orderBy(cols : _*)
}
- private def spec: WindowSpec = {
+ private[sql] def spec: WindowSpec = {
new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
index 9a1aa46947..c6f8c3ad3f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
@@ -245,6 +245,18 @@ class DataFrameWindowSuite extends QueryTest with SharedSQLContext {
Seq(Row("a", 6, 9), Row("b", 9, 6)))
}
+ test("SPARK-16195 empty over spec") {
+ val df = Seq(("a", 1), ("a", 1), ("a", 2), ("b", 2)).
+ toDF("key", "value")
+ df.createOrReplaceTempView("window_table")
+ checkAnswer(
+ df.select($"key", $"value", sum($"value").over(), avg($"value").over()),
+ Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), Row("b", 2, 6, 1.5)))
+ checkAnswer(
+ sql("select key, value, sum(value) over(), avg(value) over() from window_table"),
+ Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), Row("b", 2, 6, 1.5)))
+ }
+
test("window function with udaf") {
val udaf = new UserDefinedAggregateFunction {
def inputSchema: StructType = new StructType()