diff options
-rw-r--r-- | python/pyspark/sql/__init__.py | 25 | ||||
-rw-r--r-- | python/pyspark/sql/column.py | 54 | ||||
-rw-r--r-- | python/pyspark/sql/context.py | 2 | ||||
-rw-r--r-- | python/pyspark/sql/dataframe.py | 2 | ||||
-rw-r--r-- | python/pyspark/sql/functions.py | 147 | ||||
-rw-r--r-- | python/pyspark/sql/group.py | 2 | ||||
-rw-r--r-- | python/pyspark/sql/tests.py | 31 | ||||
-rw-r--r-- | python/pyspark/sql/window.py | 158 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 197 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala | 20 |
10 files changed, 464 insertions, 174 deletions
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 66b0bff290..8fee92ae3a 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -18,26 +18,28 @@ """ Important classes of Spark SQL and DataFrames: - - L{SQLContext} + - :class:`pyspark.sql.SQLContext` Main entry point for :class:`DataFrame` and SQL functionality. - - L{DataFrame} + - :class:`pyspark.sql.DataFrame` A distributed collection of data grouped into named columns. - - L{Column} + - :class:`pyspark.sql.Column` A column expression in a :class:`DataFrame`. - - L{Row} + - :class:`pyspark.sql.Row` A row of data in a :class:`DataFrame`. - - L{HiveContext} + - :class:`pyspark.sql.HiveContext` Main entry point for accessing data stored in Apache Hive. - - L{GroupedData} + - :class:`pyspark.sql.GroupedData` Aggregation methods, returned by :func:`DataFrame.groupBy`. - - L{DataFrameNaFunctions} + - :class:`pyspark.sql.DataFrameNaFunctions` Methods for handling missing data (null values). - - L{DataFrameStatFunctions} + - :class:`pyspark.sql.DataFrameStatFunctions` Methods for statistics functionality. - - L{functions} + - :class:`pyspark.sql.functions` List of built-in functions available for :class:`DataFrame`. - - L{types} + - :class:`pyspark.sql.types` List of data types available. + - :class:`pyspark.sql.Window` + For working with window functions. """ from __future__ import absolute_import @@ -66,8 +68,9 @@ from pyspark.sql.column import Column from pyspark.sql.dataframe import DataFrame, SchemaRDD, DataFrameNaFunctions, DataFrameStatFunctions from pyspark.sql.group import GroupedData from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter +from pyspark.sql.window import Window, WindowSpec __all__ = [ 'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', - 'DataFrameNaFunctions', 'DataFrameStatFunctions' + 'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec', ] diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index baf1ecbd0a..8dc5039f58 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -116,6 +116,8 @@ class Column(object): df.colName + 1 1 / df.colName + .. note:: Experimental + .. versionadded:: 1.3 """ @@ -164,8 +166,9 @@ class Column(object): @since(1.3) def getItem(self, key): - """An expression that gets an item at position `ordinal` out of a list, - or gets an item by key out of a dict. + """ + An expression that gets an item at position ``ordinal`` out of a list, + or gets an item by key out of a dict. >>> df = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"]) >>> df.select(df.l.getItem(0), df.d.getItem("key")).show() @@ -185,7 +188,8 @@ class Column(object): @since(1.3) def getField(self, name): - """An expression that gets a field by name in a StructField. + """ + An expression that gets a field by name in a StructField. >>> from pyspark.sql import Row >>> df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF() @@ -219,7 +223,7 @@ class Column(object): @since(1.3) def substr(self, startPos, length): """ - Return a :class:`Column` which is a substring of the column + Return a :class:`Column` which is a substring of the column. :param startPos: start position (int or Column) :param length: length of the substring (int or Column) @@ -242,7 +246,8 @@ class Column(object): @ignore_unicode_prefix @since(1.3) def inSet(self, *cols): - """ A boolean expression that is evaluated to true if the value of this + """ + A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments. >>> df[df.name.inSet("Bob", "Mike")].collect() @@ -268,7 +273,8 @@ class Column(object): @since(1.3) def alias(self, *alias): - """Returns this column aliased with a new name or names (in the case of expressions that + """ + Returns this column aliased with a new name or names (in the case of expressions that return more than one column, such as explode). >>> df.select(df.age.alias("age2")).collect() @@ -284,7 +290,7 @@ class Column(object): @ignore_unicode_prefix @since(1.3) def cast(self, dataType): - """ Convert the column into type `dataType` + """ Convert the column into type ``dataType``. >>> df.select(df.age.cast("string").alias('ages')).collect() [Row(ages=u'2'), Row(ages=u'5')] @@ -304,25 +310,24 @@ class Column(object): astype = cast - @ignore_unicode_prefix @since(1.3) def between(self, lowerBound, upperBound): - """ A boolean expression that is evaluated to true if the value of this + """ + A boolean expression that is evaluated to true if the value of this expression is between the given columns. """ return (self >= lowerBound) & (self <= upperBound) - @ignore_unicode_prefix @since(1.4) def when(self, condition, value): - """Evaluates a list of conditions and returns one of multiple possible result expressions. + """ + Evaluates a list of conditions and returns one of multiple possible result expressions. If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions. See :func:`pyspark.sql.functions.when` for example usage. :param condition: a boolean :class:`Column` expression. :param value: a literal value, or a :class:`Column` expression. - """ sc = SparkContext._active_spark_context if not isinstance(condition, Column): @@ -331,10 +336,10 @@ class Column(object): jc = sc._jvm.functions.when(condition._jc, v) return Column(jc) - @ignore_unicode_prefix @since(1.4) def otherwise(self, value): - """Evaluates a list of conditions and returns one of multiple possible result expressions. + """ + Evaluates a list of conditions and returns one of multiple possible result expressions. If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions. See :func:`pyspark.sql.functions.when` for example usage. @@ -345,6 +350,27 @@ class Column(object): jc = self._jc.otherwise(value) return Column(jc) + @since(1.4) + def over(self, window): + """ + Define a windowing column. + + :param window: a :class:`WindowSpec` + :return: a Column + + >>> from pyspark.sql import Window + >>> window = Window.partitionBy("name").orderBy("age").rowsBetween(-1, 1) + >>> from pyspark.sql.functions import rank, min + >>> # df.select(rank().over(window), min('age').over(window)) + + .. note:: Window functions is only supported with HiveContext in 1.4 + """ + from pyspark.sql.window import WindowSpec + if not isinstance(window, WindowSpec): + raise TypeError("window should be WindowSpec") + jc = self._jc.over(window._jspec) + return Column(jc) + def __repr__(self): return 'Column<%s>' % self._jc.toString().encode('utf8') diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 51f12c5bb4..22f6257dfe 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -585,8 +585,6 @@ class SQLContext(object): Returns a :class:`DataFrameReader` that can be used to read data in as a :class:`DataFrame`. - .. note:: Experimental - >>> sqlContext.read <pyspark.sql.readwriter.DataFrameReader object at ...> """ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 132db90e69..55cad8238e 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -62,6 +62,8 @@ class DataFrame(object): people.filter(people.age > 30).join(department, people.deptId == department.id)) \ .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"}) + .. note:: Experimental + .. versionadded:: 1.3 """ diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 9b0d7f3e66..bbf465aca8 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -32,16 +32,21 @@ from pyspark.sql.column import Column, _to_java_column, _to_seq __all__ = [ + 'array', 'approxCountDistinct', 'coalesce', 'countDistinct', + 'explode', 'monotonicallyIncreasingId', 'rand', 'randn', 'sparkPartitionId', + 'struct', 'udf', 'when'] +__all__ += ['lag', 'lead', 'ntile'] + def _create_function(name, doc=""): """ Create a function for aggregator by name""" @@ -67,6 +72,17 @@ def _create_binary_mathfunction(name, doc=""): return _ +def _create_window_function(name, doc=''): + """ Create a window function by name """ + def _(): + sc = SparkContext._active_spark_context + jc = getattr(sc._jvm.functions, name)() + return Column(jc) + _.__name__ = name + _.__doc__ = 'Window function: ' + doc + return _ + + _functions = { 'lit': 'Creates a :class:`Column` of literal value.', 'col': 'Returns a :class:`Column` based on the given column name.', @@ -130,15 +146,53 @@ _binary_mathfunctions = { 'pow': 'Returns the value of the first argument raised to the power of the second argument.' } +_window_functions = { + 'rowNumber': + """returns a sequential number starting at 1 within a window partition. + + This is equivalent to the ROW_NUMBER function in SQL.""", + 'denseRank': + """returns the rank of rows within a window partition, without any gaps. + + The difference between rank and denseRank is that denseRank leaves no gaps in ranking + sequence when there are ties. That is, if you were ranking a competition using denseRank + and had three people tie for second place, you would say that all three were in second + place and that the next person came in third. + + This is equivalent to the DENSE_RANK function in SQL.""", + 'rank': + """returns the rank of rows within a window partition. + + The difference between rank and denseRank is that denseRank leaves no gaps in ranking + sequence when there are ties. That is, if you were ranking a competition using denseRank + and had three people tie for second place, you would say that all three were in second + place and that the next person came in third. + + This is equivalent to the RANK function in SQL.""", + 'cumeDist': + """returns the cumulative distribution of values within a window partition, + i.e. the fraction of rows that are below the current row. + + This is equivalent to the CUME_DIST function in SQL.""", + 'percentRank': + """returns the relative rank (i.e. percentile) of rows within a window partition. + + This is equivalent to the PERCENT_RANK function in SQL.""", +} + for _name, _doc in _functions.items(): globals()[_name] = since(1.3)(_create_function(_name, _doc)) for _name, _doc in _functions_1_4.items(): globals()[_name] = since(1.4)(_create_function(_name, _doc)) for _name, _doc in _binary_mathfunctions.items(): globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc)) +for _name, _doc in _window_functions.items(): + globals()[_name] = since(1.4)(_create_window_function(_name, _doc)) del _name, _doc __all__ += _functions.keys() +__all__ += _functions_1_4.keys() __all__ += _binary_mathfunctions.keys() +__all__ += _window_functions.keys() __all__.sort() @@ -177,27 +231,6 @@ def approxCountDistinct(col, rsd=None): @since(1.4) -def explode(col): - """Returns a new row for each element in the given array or map. - - >>> from pyspark.sql import Row - >>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) - >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect() - [Row(anInt=1), Row(anInt=2), Row(anInt=3)] - - >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show() - +---+-----+ - |key|value| - +---+-----+ - | a| b| - +---+-----+ - """ - sc = SparkContext._active_spark_context - jc = sc._jvm.functions.explode(_to_java_column(col)) - return Column(jc) - - -@since(1.4) def coalesce(*cols): """Returns the first column that is not null. @@ -250,6 +283,27 @@ def countDistinct(col, *cols): @since(1.4) +def explode(col): + """Returns a new row for each element in the given array or map. + + >>> from pyspark.sql import Row + >>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) + >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect() + [Row(anInt=1), Row(anInt=2), Row(anInt=3)] + + >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show() + +---+-----+ + |key|value| + +---+-----+ + | a| b| + +---+-----+ + """ + sc = SparkContext._active_spark_context + jc = sc._jvm.functions.explode(_to_java_column(col)) + return Column(jc) + + +@since(1.4) def monotonicallyIncreasingId(): """A column that generates monotonically increasing 64-bit integers. @@ -258,7 +312,7 @@ def monotonicallyIncreasingId(): within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. - As an example, consider a [[DataFrame]] with two partitions, each with 3 records. + As an example, consider a :class:`DataFrame` with two partitions, each with 3 records. This expression would return the following IDs: 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. @@ -349,6 +403,55 @@ def when(condition, value): return Column(jc) +@since(1.4) +def lag(col, count=1, default=None): + """ + Window function: returns the value that is `offset` rows before the current row, and + `defaultValue` if there is less than `offset` rows before the current row. For example, + an `offset` of one will return the previous row at any given point in the window partition. + + This is equivalent to the LAG function in SQL. + + :param col: name of column or expression + :param count: number of row to extend + :param default: default value + """ + sc = SparkContext._active_spark_context + return Column(sc._jvm.functions.lag(_to_java_column(col), count, default)) + + +@since(1.4) +def lead(col, count=1, default=None): + """ + Window function: returns the value that is `offset` rows after the current row, and + `defaultValue` if there is less than `offset` rows after the current row. For example, + an `offset` of one will return the next row at any given point in the window partition. + + This is equivalent to the LEAD function in SQL. + + :param col: name of column or expression + :param count: number of row to extend + :param default: default value + """ + sc = SparkContext._active_spark_context + return Column(sc._jvm.functions.lead(_to_java_column(col), count, default)) + + +@since(1.4) +def ntile(n): + """ + Window function: returns a group id from 1 to `n` (inclusive) in a round-robin fashion in + a window partition. Fow example, if `n` is 3, the first row will get 1, the second row will + get 2, the third row will get 3, and the fourth row will get 1... + + This is equivalent to the NTILE function in SQL. + + :param n: an integer + """ + sc = SparkContext._active_spark_context + return Column(sc._jvm.functions.ntile(int(n))) + + class UserDefinedFunction(object): """ User defined function in Python diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index 4da472a577..5a37a673ee 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -49,6 +49,8 @@ class GroupedData(object): A set of methods for aggregations on a :class:`DataFrame`, created by :func:`DataFrame.groupBy`. + .. note:: Experimental + .. versionadded:: 1.3 """ diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 7e34996241..5c53c3a8ed 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -44,6 +44,7 @@ from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type from pyspark.tests import ReusedPySparkTestCase from pyspark.sql.functions import UserDefinedFunction +from pyspark.sql.window import Window class ExamplePointUDT(UserDefinedType): @@ -743,11 +744,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase): try: cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf() except py4j.protocol.Py4JError: - cls.sqlCtx = None - return + raise unittest.SkipTest("Hive is not available") except TypeError: - cls.sqlCtx = None - return + raise unittest.SkipTest("Hive is not available") os.unlink(cls.tempdir.name) _scala_HiveContext =\ cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc()) @@ -761,9 +760,6 @@ class HiveContextSQLTests(ReusedPySparkTestCase): shutil.rmtree(cls.tempdir.name, ignore_errors=True) def test_save_and_load_table(self): - if self.sqlCtx is None: - return # no hive available, skipped - df = self.df tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) @@ -805,6 +801,27 @@ class HiveContextSQLTests(ReusedPySparkTestCase): shutil.rmtree(tmpPath) + def test_window_functions(self): + df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) + w = Window.partitionBy("value").orderBy("key") + from pyspark.sql import functions as F + sel = df.select(df.value, df.key, + F.max("key").over(w.rowsBetween(0, 1)), + F.min("key").over(w.rowsBetween(0, 1)), + F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))), + F.rowNumber().over(w), + F.rank().over(w), + F.denseRank().over(w), + F.ntile(2).over(w)) + rs = sorted(sel.collect()) + expected = [ + ("1", 1, 1, 1, 1, 1, 1, 1, 1), + ("2", 1, 1, 1, 3, 1, 1, 1, 1), + ("2", 1, 2, 1, 3, 2, 1, 1, 1), + ("2", 2, 2, 2, 3, 3, 3, 2, 2) + ] + for r, ex in zip(rs, expected): + self.assertEqual(tuple(r), ex[:len(r)]) if __name__ == "__main__": unittest.main() diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py new file mode 100644 index 0000000000..0a0e006bdf --- /dev/null +++ b/python/pyspark/sql/window.py @@ -0,0 +1,158 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext +from pyspark.sql import since +from pyspark.sql.column import _to_seq, _to_java_column + +__all__ = ["Window", "WindowSpec"] + + +def _to_java_cols(cols): + sc = SparkContext._active_spark_context + if len(cols) == 1 and isinstance(cols[0], list): + cols = cols[0] + return _to_seq(sc, cols, _to_java_column) + + +class Window(object): + + """ + Utility functions for defining window in DataFrames. + + For example: + + >>> # PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + >>> window = Window.partitionBy("country").orderBy("date").rowsBetween(-sys.maxsize, 0) + + >>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING + >>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3) + + .. note:: Experimental + + .. versionadded:: 1.4 + """ + @staticmethod + @since(1.4) + def partitionBy(*cols): + """ + Creates a :class:`WindowSpec` with the partitioning defined. + """ + sc = SparkContext._active_spark_context + jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols)) + return WindowSpec(jspec) + + @staticmethod + @since(1.4) + def orderBy(*cols): + """ + Creates a :class:`WindowSpec` with the partitioning defined. + """ + sc = SparkContext._active_spark_context + jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols)) + return WindowSpec(jspec) + + +class WindowSpec(object): + """ + A window specification that defines the partitioning, ordering, + and frame boundaries. + + Use the static methods in :class:`Window` to create a :class:`WindowSpec`. + + .. note:: Experimental + + .. versionadded:: 1.4 + """ + + _JAVA_MAX_LONG = (1 << 63) - 1 + _JAVA_MIN_LONG = - (1 << 63) + + def __init__(self, jspec): + self._jspec = jspec + + @since(1.4) + def partitionBy(self, *cols): + """ + Defines the partitioning columns in a :class:`WindowSpec`. + + :param cols: names of columns or expressions + """ + return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols))) + + @since(1.4) + def orderBy(self, *cols): + """ + Defines the ordering columns in a :class:`WindowSpec`. + + :param cols: names of columns or expressions + """ + return WindowSpec(self._jspec.orderBy(_to_java_cols(cols))) + + @since(1.4) + def rowsBetween(self, start, end): + """ + Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). + + Both `start` and `end` are relative positions from the current row. + For example, "0" means "current row", while "-1" means the row before + the current row, and "5" means the fifth row after the current row. + + :param start: boundary start, inclusive. + The frame is unbounded if this is ``-sys.maxsize`` (or lower). + :param end: boundary end, inclusive. + The frame is unbounded if this is ``sys.maxsize`` (or higher). + """ + if start <= -sys.maxsize: + start = self._JAVA_MIN_LONG + if end >= sys.maxsize: + end = self._JAVA_MAX_LONG + return WindowSpec(self._jspec.rowsBetween(start, end)) + + @since(1.4) + def rangeBetween(self, start, end): + """ + Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). + + Both `start` and `end` are relative from the current row. For example, + "0" means "current row", while "-1" means one off before the current row, + and "5" means the five off after the current row. + + :param start: boundary start, inclusive. + The frame is unbounded if this is ``-sys.maxsize`` (or lower). + :param end: boundary end, inclusive. + The frame is unbounded if this is ``sys.maxsize`` (or higher). + """ + if start <= -sys.maxsize: + start = self._JAVA_MIN_LONG + if end >= sys.maxsize: + end = self._JAVA_MAX_LONG + return WindowSpec(self._jspec.rangeBetween(start, end)) + + +def _test(): + import doctest + SparkContext('local[4]', 'PythonTest') + (failure_count, test_count) = doctest.testmod() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 8775be724e..9a23cfb89c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -326,168 +326,135 @@ object functions { ////////////////////////////////////////////////////////////////////////////////////////////// /** - * Window function: returns the lag value of current row of the expression, - * null when the current row extends before the beginning of the window. + * Window function: returns the value that is `offset` rows before the current row, and + * `null` if there is less than `offset` rows before the current row. For example, + * an `offset` of one will return the previous row at any given point in the window partition. * - * @group window_funcs - * @since 1.4.0 - */ - def lag(columnName: String): Column = { - lag(columnName, 1) - } - - /** - * Window function: returns the lag value of current row of the column, - * null when the current row extends before the beginning of the window. + * This is equivalent to the LAG function in SQL. * * @group window_funcs * @since 1.4.0 */ - def lag(e: Column): Column = { - lag(e, 1) + def lag(e: Column, offset: Int): Column = { + lag(e, offset, null) } /** - * Window function: returns the lag values of current row of the expression, - * null when the current row extends before the beginning of the window. + * Window function: returns the value that is `offset` rows before the current row, and + * `null` if there is less than `offset` rows before the current row. For example, + * an `offset` of one will return the previous row at any given point in the window partition. * - * @group window_funcs - * @since 1.4.0 - */ - def lag(e: Column, count: Int): Column = { - lag(e, count, null) - } - - /** - * Window function: returns the lag values of current row of the column, - * null when the current row extends before the beginning of the window. + * This is equivalent to the LAG function in SQL. * * @group window_funcs * @since 1.4.0 */ - def lag(columnName: String, count: Int): Column = { - lag(columnName, count, null) + def lag(columnName: String, offset: Int): Column = { + lag(columnName, offset, null) } /** - * Window function: returns the lag values of current row of the column, - * given default value when the current row extends before the beginning - * of the window. + * Window function: returns the value that is `offset` rows before the current row, and + * `defaultValue` if there is less than `offset` rows before the current row. For example, + * an `offset` of one will return the previous row at any given point in the window partition. * - * @group window_funcs - * @since 1.4.0 - */ - def lag(columnName: String, count: Int, defaultValue: Any): Column = { - lag(Column(columnName), count, defaultValue) - } - - /** - * Window function: returns the lag values of current row of the expression, - * given default value when the current row extends before the beginning - * of the window. + * This is equivalent to the LAG function in SQL. * * @group window_funcs * @since 1.4.0 */ - def lag(e: Column, count: Int, defaultValue: Any): Column = { - UnresolvedWindowFunction("lag", e.expr :: Literal(count) :: Literal(defaultValue) :: Nil) + def lag(columnName: String, offset: Int, defaultValue: Any): Column = { + lag(Column(columnName), offset, defaultValue) } /** - * Window function: returns the lead value of current row of the column, - * null when the current row extends before the end of the window. + * Window function: returns the value that is `offset` rows before the current row, and + * `defaultValue` if there is less than `offset` rows before the current row. For example, + * an `offset` of one will return the previous row at any given point in the window partition. * - * @group window_funcs - * @since 1.4.0 - */ - def lead(columnName: String): Column = { - lead(columnName, 1) - } - - /** - * Window function: returns the lead value of current row of the expression, - * null when the current row extends before the end of the window. + * This is equivalent to the LAG function in SQL. * * @group window_funcs * @since 1.4.0 */ - def lead(e: Column): Column = { - lead(e, 1) + def lag(e: Column, offset: Int, defaultValue: Any): Column = { + UnresolvedWindowFunction("lag", e.expr :: Literal(offset) :: Literal(defaultValue) :: Nil) } /** - * Window function: returns the lead values of current row of the column, - * null when the current row extends before the end of the window. + * Window function: returns the value that is `offset` rows after the current row, and + * `null` if there is less than `offset` rows after the current row. For example, + * an `offset` of one will return the next row at any given point in the window partition. * - * @group window_funcs - * @since 1.4.0 - */ - def lead(columnName: String, count: Int): Column = { - lead(columnName, count, null) - } - - /** - * Window function: returns the lead values of current row of the expression, - * null when the current row extends before the end of the window. + * This is equivalent to the LEAD function in SQL. * * @group window_funcs * @since 1.4.0 */ - def lead(e: Column, count: Int): Column = { - lead(e, count, null) + def lead(columnName: String, offset: Int): Column = { + lead(columnName, offset, null) } /** - * Window function: returns the lead values of current row of the column, - * given default value when the current row extends before the end of the window. + * Window function: returns the value that is `offset` rows after the current row, and + * `null` if there is less than `offset` rows after the current row. For example, + * an `offset` of one will return the next row at any given point in the window partition. + * + * This is equivalent to the LEAD function in SQL. * * @group window_funcs * @since 1.4.0 */ - def lead(columnName: String, count: Int, defaultValue: Any): Column = { - lead(Column(columnName), count, defaultValue) + def lead(e: Column, offset: Int): Column = { + lead(e, offset, null) } /** - * Window function: returns the lead values of current row of the expression, - * given default value when the current row extends before the end of the window. + * Window function: returns the value that is `offset` rows after the current row, and + * `defaultValue` if there is less than `offset` rows after the current row. For example, + * an `offset` of one will return the next row at any given point in the window partition. + * + * This is equivalent to the LEAD function in SQL. * * @group window_funcs * @since 1.4.0 */ - def lead(e: Column, count: Int, defaultValue: Any): Column = { - UnresolvedWindowFunction("lead", e.expr :: Literal(count) :: Literal(defaultValue) :: Nil) + def lead(columnName: String, offset: Int, defaultValue: Any): Column = { + lead(Column(columnName), offset, defaultValue) } /** - * NTILE for specified expression. - * NTILE allows easy calculation of tertiles, quartiles, deciles and other - * common summary statistics. This function divides an ordered partition into a specified - * number of groups called buckets and assigns a bucket number to each row in the partition. + * Window function: returns the value that is `offset` rows after the current row, and + * `defaultValue` if there is less than `offset` rows after the current row. For example, + * an `offset` of one will return the next row at any given point in the window partition. + * + * This is equivalent to the LEAD function in SQL. * * @group window_funcs * @since 1.4.0 */ - def ntile(e: Column): Column = { - UnresolvedWindowFunction("ntile", e.expr :: Nil) + def lead(e: Column, offset: Int, defaultValue: Any): Column = { + UnresolvedWindowFunction("lead", e.expr :: Literal(offset) :: Literal(defaultValue) :: Nil) } /** - * NTILE for specified column. - * NTILE allows easy calculation of tertiles, quartiles, deciles and other - * common summary statistics. This function divides an ordered partition into a specified - * number of groups called buckets and assigns a bucket number to each row in the partition. + * Window function: returns the ntile group id (from 1 to `n` inclusive) in an ordered window + * partition. Fow example, if `n` is 4, the first quarter of the rows will get value 1, the second + * quarter will get 2, the third quarter will get 3, and the last quarter will get 4. + * + * This is equivalent to the NTILE function in SQL. * * @group window_funcs * @since 1.4.0 */ - def ntile(columnName: String): Column = { - ntile(Column(columnName)) + def ntile(n: Int): Column = { + UnresolvedWindowFunction("ntile", lit(n).expr :: Nil) } /** - * Assigns a unique number (sequentially, starting from 1, as defined by ORDER BY) to each - * row within the partition. + * Window function: returns a sequential number starting at 1 within a window partition. + * + * This is equivalent to the ROW_NUMBER function in SQL. * * @group window_funcs * @since 1.4.0 @@ -497,11 +464,15 @@ object functions { } /** - * The difference between RANK and DENSE_RANK is that DENSE_RANK leaves no gaps in ranking - * sequence when there are ties. That is, if you were ranking a competition using DENSE_RANK + * Window function: returns the rank of rows within a window partition, without any gaps. + * + * The difference between rank and denseRank is that denseRank leaves no gaps in ranking + * sequence when there are ties. That is, if you were ranking a competition using denseRank * and had three people tie for second place, you would say that all three were in second * place and that the next person came in third. * + * This is equivalent to the DENSE_RANK function in SQL. + * * @group window_funcs * @since 1.4.0 */ @@ -510,11 +481,15 @@ object functions { } /** - * The difference between RANK and DENSE_RANK is that DENSE_RANK leaves no gaps in ranking - * sequence when there are ties. That is, if you were ranking a competition using DENSE_RANK + * Window function: returns the rank of rows within a window partition. + * + * The difference between rank and denseRank is that denseRank leaves no gaps in ranking + * sequence when there are ties. That is, if you were ranking a competition using denseRank * and had three people tie for second place, you would say that all three were in second * place and that the next person came in third. * + * This is equivalent to the RANK function in SQL. + * * @group window_funcs * @since 1.4.0 */ @@ -523,10 +498,16 @@ object functions { } /** - * CUME_DIST (defined as the inverse of percentile in some statistical books) computes - * the position of a specified value relative to a set of values. - * To compute the CUME_DIST of a value x in a set S of size N, you use the formula: - * CUME_DIST(x) = number of values in S coming before and including x in the specified order / N + * Window function: returns the cumulative distribution of values within a window partition, + * i.e. the fraction of rows that are below the current row. + * + * {{{ + * N = total number of rows in the partition + * cumeDist(x) = number of values before (and including) x / N + * }}} + * + * + * This is equivalent to the CUME_DIST function in SQL. * * @group window_funcs * @since 1.4.0 @@ -536,10 +517,14 @@ object functions { } /** - * PERCENT_RANK is similar to CUME_DIST, but it uses rank values rather than row counts - * in its numerator. - * The formula: - * (rank of row in its partition - 1) / (number of rows in the partition - 1) + * Window function: returns the relative rank (i.e. percentile) of rows within a window partition. + * + * This is computed by: + * {{{ + * (rank of row in its partition - 1) / (number of rows in the partition - 1) + * }}} + * + * This is equivalent to the PERCENT_RANK function in SQL. * * @group window_funcs * @since 1.4.0 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala index 6cea6776c8..efb3f2545d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala @@ -31,8 +31,8 @@ class HiveDataFrameWindowSuite extends QueryTest { checkAnswer( df.select( - lead("key").over(w), - lead("value").over(w)), + lead("key", 1).over(w), + lead("value", 1).over(w)), Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil) } @@ -42,8 +42,8 @@ class HiveDataFrameWindowSuite extends QueryTest { checkAnswer( df.select( - lead("key").over(w), - lead("value").over(w)), + lead("key", 1).over(w), + lead("value", 1).over(w)), Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil) } @@ -53,7 +53,7 @@ class HiveDataFrameWindowSuite extends QueryTest { checkAnswer( df.select( - lead("value").over(Window.partitionBy($"key").orderBy($"value"))), + lead("value", 1).over(Window.partitionBy($"key").orderBy($"value"))), sql( """SELECT | lead(value) OVER (PARTITION BY key ORDER BY value) @@ -66,9 +66,7 @@ class HiveDataFrameWindowSuite extends QueryTest { checkAnswer( df.select( - lag("value").over( - Window.partitionBy($"key") - .orderBy($"value"))), + lag("value", 1).over(Window.partitionBy($"key").orderBy($"value"))), sql( """SELECT | lag(value) OVER (PARTITION BY key ORDER BY value) @@ -112,8 +110,7 @@ class HiveDataFrameWindowSuite extends QueryTest { mean("key").over(Window.partitionBy("value").orderBy("key")), count("key").over(Window.partitionBy("value").orderBy("key")), sum("key").over(Window.partitionBy("value").orderBy("key")), - ntile("key").over(Window.partitionBy("value").orderBy("key")), - ntile($"key").over(Window.partitionBy("value").orderBy("key")), + ntile(2).over(Window.partitionBy("value").orderBy("key")), rowNumber().over(Window.partitionBy("value").orderBy("key")), denseRank().over(Window.partitionBy("value").orderBy("key")), rank().over(Window.partitionBy("value").orderBy("key")), @@ -127,8 +124,7 @@ class HiveDataFrameWindowSuite extends QueryTest { |avg(key) over (partition by value order by key), |count(key) over (partition by value order by key), |sum(key) over (partition by value order by key), - |ntile(key) over (partition by value order by key), - |ntile(key) over (partition by value order by key), + |ntile(2) over (partition by value order by key), |row_number() over (partition by value order by key), |dense_rank() over (partition by value order by key), |rank() over (partition by value order by key), |