aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/functions.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-05-23 08:30:05 -0700
committerYin Huai <yhuai@databricks.com>2015-05-23 08:30:05 -0700
commitefe3bfdf496aa6206ace2697e31dd4c0c3c824fb (patch)
treea6c0adbff3ff029c0e87ceff4180f6b3c99ea5ff /python/pyspark/sql/functions.py
parentad0badba1450295982738934da2cc121cde18213 (diff)
downloadspark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.tar.gz
spark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.tar.bz2
spark-efe3bfdf496aa6206ace2697e31dd4c0c3c824fb.zip
[SPARK-7322, SPARK-7836, SPARK-7822][SQL] DataFrame window function related updates
1. ntile should take an integer as parameter. 2. Added Python API (based on #6364) 3. Update documentation of various DataFrame Python functions. Author: Davies Liu <davies@databricks.com> Author: Reynold Xin <rxin@databricks.com> Closes #6374 from rxin/window-final and squashes the following commits: 69004c7 [Reynold Xin] Style fix. 288cea9 [Reynold Xin] Update documentaiton. 7cb8985 [Reynold Xin] Merge pull request #6364 from davies/window 66092b4 [Davies Liu] update docs ed73cb4 [Reynold Xin] [SPARK-7322][SQL] Improve DataFrame window function documentation. ef55132 [Davies Liu] Merge branch 'master' of github.com:apache/spark into window4 8936ade [Davies Liu] fix maxint in python 3 2649358 [Davies Liu] update docs 778e2c0 [Davies Liu] SPARK-7836 and SPARK-7822: Python API of window functions
Diffstat (limited to 'python/pyspark/sql/functions.py')
-rw-r--r--python/pyspark/sql/functions.py147
1 files changed, 125 insertions, 22 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 9b0d7f3e66..bbf465aca8 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -32,16 +32,21 @@ from pyspark.sql.column import Column, _to_java_column, _to_seq
__all__ = [
+ 'array',
'approxCountDistinct',
'coalesce',
'countDistinct',
+ 'explode',
'monotonicallyIncreasingId',
'rand',
'randn',
'sparkPartitionId',
+ 'struct',
'udf',
'when']
+__all__ += ['lag', 'lead', 'ntile']
+
def _create_function(name, doc=""):
""" Create a function for aggregator by name"""
@@ -67,6 +72,17 @@ def _create_binary_mathfunction(name, doc=""):
return _
+def _create_window_function(name, doc=''):
+ """ Create a window function by name """
+ def _():
+ sc = SparkContext._active_spark_context
+ jc = getattr(sc._jvm.functions, name)()
+ return Column(jc)
+ _.__name__ = name
+ _.__doc__ = 'Window function: ' + doc
+ return _
+
+
_functions = {
'lit': 'Creates a :class:`Column` of literal value.',
'col': 'Returns a :class:`Column` based on the given column name.',
@@ -130,15 +146,53 @@ _binary_mathfunctions = {
'pow': 'Returns the value of the first argument raised to the power of the second argument.'
}
+_window_functions = {
+ 'rowNumber':
+ """returns a sequential number starting at 1 within a window partition.
+
+ This is equivalent to the ROW_NUMBER function in SQL.""",
+ 'denseRank':
+ """returns the rank of rows within a window partition, without any gaps.
+
+ The difference between rank and denseRank is that denseRank leaves no gaps in ranking
+ sequence when there are ties. That is, if you were ranking a competition using denseRank
+ and had three people tie for second place, you would say that all three were in second
+ place and that the next person came in third.
+
+ This is equivalent to the DENSE_RANK function in SQL.""",
+ 'rank':
+ """returns the rank of rows within a window partition.
+
+ The difference between rank and denseRank is that denseRank leaves no gaps in ranking
+ sequence when there are ties. That is, if you were ranking a competition using denseRank
+ and had three people tie for second place, you would say that all three were in second
+ place and that the next person came in third.
+
+ This is equivalent to the RANK function in SQL.""",
+ 'cumeDist':
+ """returns the cumulative distribution of values within a window partition,
+ i.e. the fraction of rows that are below the current row.
+
+ This is equivalent to the CUME_DIST function in SQL.""",
+ 'percentRank':
+ """returns the relative rank (i.e. percentile) of rows within a window partition.
+
+ This is equivalent to the PERCENT_RANK function in SQL.""",
+}
+
for _name, _doc in _functions.items():
globals()[_name] = since(1.3)(_create_function(_name, _doc))
for _name, _doc in _functions_1_4.items():
globals()[_name] = since(1.4)(_create_function(_name, _doc))
for _name, _doc in _binary_mathfunctions.items():
globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc))
+for _name, _doc in _window_functions.items():
+ globals()[_name] = since(1.4)(_create_window_function(_name, _doc))
del _name, _doc
__all__ += _functions.keys()
+__all__ += _functions_1_4.keys()
__all__ += _binary_mathfunctions.keys()
+__all__ += _window_functions.keys()
__all__.sort()
@@ -177,27 +231,6 @@ def approxCountDistinct(col, rsd=None):
@since(1.4)
-def explode(col):
- """Returns a new row for each element in the given array or map.
-
- >>> from pyspark.sql import Row
- >>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
- >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
- [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
-
- >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
- +---+-----+
- |key|value|
- +---+-----+
- | a| b|
- +---+-----+
- """
- sc = SparkContext._active_spark_context
- jc = sc._jvm.functions.explode(_to_java_column(col))
- return Column(jc)
-
-
-@since(1.4)
def coalesce(*cols):
"""Returns the first column that is not null.
@@ -250,6 +283,27 @@ def countDistinct(col, *cols):
@since(1.4)
+def explode(col):
+ """Returns a new row for each element in the given array or map.
+
+ >>> from pyspark.sql import Row
+ >>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
+ >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
+ [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
+
+ >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+ +---+-----+
+ |key|value|
+ +---+-----+
+ | a| b|
+ +---+-----+
+ """
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.explode(_to_java_column(col))
+ return Column(jc)
+
+
+@since(1.4)
def monotonicallyIncreasingId():
"""A column that generates monotonically increasing 64-bit integers.
@@ -258,7 +312,7 @@ def monotonicallyIncreasingId():
within each partition in the lower 33 bits. The assumption is that the data frame has
less than 1 billion partitions, and each partition has less than 8 billion records.
- As an example, consider a [[DataFrame]] with two partitions, each with 3 records.
+ As an example, consider a :class:`DataFrame` with two partitions, each with 3 records.
This expression would return the following IDs:
0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
@@ -349,6 +403,55 @@ def when(condition, value):
return Column(jc)
+@since(1.4)
+def lag(col, count=1, default=None):
+ """
+ Window function: returns the value that is `offset` rows before the current row, and
+ `defaultValue` if there is less than `offset` rows before the current row. For example,
+ an `offset` of one will return the previous row at any given point in the window partition.
+
+ This is equivalent to the LAG function in SQL.
+
+ :param col: name of column or expression
+ :param count: number of row to extend
+ :param default: default value
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.lag(_to_java_column(col), count, default))
+
+
+@since(1.4)
+def lead(col, count=1, default=None):
+ """
+ Window function: returns the value that is `offset` rows after the current row, and
+ `defaultValue` if there is less than `offset` rows after the current row. For example,
+ an `offset` of one will return the next row at any given point in the window partition.
+
+ This is equivalent to the LEAD function in SQL.
+
+ :param col: name of column or expression
+ :param count: number of row to extend
+ :param default: default value
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.lead(_to_java_column(col), count, default))
+
+
+@since(1.4)
+def ntile(n):
+ """
+ Window function: returns a group id from 1 to `n` (inclusive) in a round-robin fashion in
+ a window partition. Fow example, if `n` is 3, the first row will get 1, the second row will
+ get 2, the third row will get 3, and the fourth row will get 1...
+
+ This is equivalent to the NTILE function in SQL.
+
+ :param n: an integer
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.ntile(int(n)))
+
+
class UserDefinedFunction(object):
"""
User defined function in Python