aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@questtec.nl>2016-01-31 13:56:13 -0800
committerReynold Xin <rxin@databricks.com>2016-01-31 13:56:13 -0800
commit5a8b978fabb60aa178274f86432c63680c8b351a (patch)
treedd3de9b6cd79870813ccc8ca898da182f2bd881b /sql
parent0e6d92d042b0a2920d8df5959d5913ba0166a678 (diff)
downloadspark-5a8b978fabb60aa178274f86432c63680c8b351a.tar.gz
spark-5a8b978fabb60aa178274f86432c63680c8b351a.tar.bz2
spark-5a8b978fabb60aa178274f86432c63680c8b351a.zip
[SPARK-13049] Add First/last with ignore nulls to functions.scala
This PR adds the ability to specify the ```ignoreNulls``` option to the functions dsl, e.g: ```df.select($"id", last($"value", ignoreNulls = true).over(Window.partitionBy($"id").orderBy($"other"))``` This PR is some where between a bug fix (see the JIRA) and a new feature. I am not sure if we should backport to 1.6. cc yhuai Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #10957 from hvanhovell/SPARK-13049.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala118
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala32
2 files changed, 123 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 3a27466176..b970eee4e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -349,19 +349,51 @@ object functions extends LegacyFunctions {
}
/**
- * Aggregate function: returns the first value in a group.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
- def first(e: Column): Column = withAggregateFunction { new First(e.expr) }
-
- /**
- * Aggregate function: returns the first value of a column in a group.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
+ * Aggregate function: returns the first value in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
+ def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
+ new First(e.expr, Literal(ignoreNulls))
+ }
+
+ /**
+ * Aggregate function: returns the first value of a column in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
+ def first(columnName: String, ignoreNulls: Boolean): Column = {
+ first(Column(columnName), ignoreNulls)
+ }
+
+ /**
+ * Aggregate function: returns the first value in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
+ def first(e: Column): Column = first(e, ignoreNulls = false)
+
+ /**
+ * Aggregate function: returns the first value of a column in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
def first(columnName: String): Column = first(Column(columnName))
/**
@@ -381,20 +413,52 @@ object functions extends LegacyFunctions {
def kurtosis(columnName: String): Column = kurtosis(Column(columnName))
/**
- * Aggregate function: returns the last value in a group.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
- def last(e: Column): Column = withAggregateFunction { new Last(e.expr) }
-
- /**
- * Aggregate function: returns the last value of the column in a group.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
- def last(columnName: String): Column = last(Column(columnName))
+ * Aggregate function: returns the last value in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
+ def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
+ new Last(e.expr, Literal(ignoreNulls))
+ }
+
+ /**
+ * Aggregate function: returns the last value of the column in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
+ def last(columnName: String, ignoreNulls: Boolean): Column = {
+ last(Column(columnName), ignoreNulls)
+ }
+
+ /**
+ * Aggregate function: returns the last value in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
+ def last(e: Column): Column = last(e, ignoreNulls = false)
+
+ /**
+ * Aggregate function: returns the last value of the column in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
+ def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false)
/**
* Aggregate function: returns the maximum value of the expression in a group.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
index 09a56f6f3a..d38842c3c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
@@ -312,4 +312,36 @@ class DataFrameWindowSuite extends QueryTest with SharedSQLContext {
Row("b", 3, null, null),
Row("b", 2, null, null)))
}
+
+ test("last/first with ignoreNulls") {
+ val nullStr: String = null
+ val df = Seq(
+ ("a", 0, nullStr),
+ ("a", 1, "x"),
+ ("a", 2, "y"),
+ ("a", 3, "z"),
+ ("a", 4, nullStr),
+ ("b", 1, nullStr),
+ ("b", 2, nullStr)).
+ toDF("key", "order", "value")
+ val window = Window.partitionBy($"key").orderBy($"order")
+ checkAnswer(
+ df.select(
+ $"key",
+ $"order",
+ first($"value").over(window),
+ first($"value", ignoreNulls = false).over(window),
+ first($"value", ignoreNulls = true).over(window),
+ last($"value").over(window),
+ last($"value", ignoreNulls = false).over(window),
+ last($"value", ignoreNulls = true).over(window)),
+ Seq(
+ Row("a", 0, null, null, null, null, null, null),
+ Row("a", 1, null, null, "x", "x", "x", "x"),
+ Row("a", 2, null, null, "x", "y", "y", "y"),
+ Row("a", 3, null, null, "x", "z", "z", "z"),
+ Row("a", 4, null, null, "x", null, null, "z"),
+ Row("b", 1, null, null, null, null, null, null),
+ Row("b", 2, null, null, null, null, null, null)))
+ }
}