aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/functions.py
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-24 21:30:53 -0800
committerReynold Xin <rxin@databricks.com>2015-11-24 21:30:53 -0800
commit151d7c2baf18403e6e59e97c80c8bcded6148038 (patch)
treed6b60910109f12e5d91737370ac6ec1e2671241c /python/pyspark/sql/functions.py
parenta5d988763319f63a8e2b58673dd4f9098f17c835 (diff)
downloadspark-151d7c2baf18403e6e59e97c80c8bcded6148038.tar.gz
spark-151d7c2baf18403e6e59e97c80c8bcded6148038.tar.bz2
spark-151d7c2baf18403e6e59e97c80c8bcded6148038.zip
[SPARK-10621][SQL] Consistent naming for functions in SQL, Python, Scala
Author: Reynold Xin <rxin@databricks.com> Closes #9948 from rxin/SPARK-10621.
Diffstat (limited to 'python/pyspark/sql/functions.py')
-rw-r--r--python/pyspark/sql/functions.py111
1 files changed, 94 insertions, 17 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index a1ca723bbd..e3786e0fa5 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -150,18 +150,18 @@ _binary_mathfunctions = {
_window_functions = {
'rowNumber':
- """returns a sequential number starting at 1 within a window partition.
-
- This is equivalent to the ROW_NUMBER function in SQL.""",
+ """.. note:: Deprecated in 1.6, use row_number instead.""",
+ 'row_number':
+ """returns a sequential number starting at 1 within a window partition.""",
'denseRank':
+ """.. note:: Deprecated in 1.6, use dense_rank instead.""",
+ 'dense_rank':
"""returns the rank of rows within a window partition, without any gaps.
The difference between rank and denseRank is that denseRank leaves no gaps in ranking
sequence when there are ties. That is, if you were ranking a competition using denseRank
and had three people tie for second place, you would say that all three were in second
- place and that the next person came in third.
-
- This is equivalent to the DENSE_RANK function in SQL.""",
+ place and that the next person came in third.""",
'rank':
"""returns the rank of rows within a window partition.
@@ -172,14 +172,14 @@ _window_functions = {
This is equivalent to the RANK function in SQL.""",
'cumeDist':
+ """.. note:: Deprecated in 1.6, use cume_dist instead.""",
+ 'cume_dist':
"""returns the cumulative distribution of values within a window partition,
- i.e. the fraction of rows that are below the current row.
-
- This is equivalent to the CUME_DIST function in SQL.""",
+ i.e. the fraction of rows that are below the current row.""",
'percentRank':
- """returns the relative rank (i.e. percentile) of rows within a window partition.
-
- This is equivalent to the PERCENT_RANK function in SQL.""",
+ """.. note:: Deprecated in 1.6, use percent_rank instead.""",
+ 'percent_rank':
+ """returns the relative rank (i.e. percentile) of rows within a window partition.""",
}
for _name, _doc in _functions.items():
@@ -189,7 +189,7 @@ for _name, _doc in _functions_1_4.items():
for _name, _doc in _binary_mathfunctions.items():
globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc))
for _name, _doc in _window_functions.items():
- globals()[_name] = since(1.4)(_create_window_function(_name, _doc))
+ globals()[_name] = since(1.6)(_create_window_function(_name, _doc))
for _name, _doc in _functions_1_6.items():
globals()[_name] = since(1.6)(_create_function(_name, _doc))
del _name, _doc
@@ -288,6 +288,38 @@ def countDistinct(col, *cols):
@since(1.4)
def monotonicallyIncreasingId():
+ """
+ .. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
+ """
+ return monotonically_increasing_id()
+
+
+@since(1.6)
+def input_file_name():
+ """Creates a string column for the file name of the current Spark task.
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.input_file_name())
+
+
+@since(1.6)
+def isnan(col):
+ """An expression that returns true iff the column is NaN.
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.isnan(_to_java_column(col)))
+
+
+@since(1.6)
+def isnull(col):
+ """An expression that returns true iff the column is null.
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.isnull(_to_java_column(col)))
+
+
+@since(1.6)
+def monotonically_increasing_id():
"""A column that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
@@ -300,11 +332,21 @@ def monotonicallyIncreasingId():
0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
- >>> df0.select(monotonicallyIncreasingId().alias('id')).collect()
+ >>> df0.select(monotonically_increasing_id().alias('id')).collect()
[Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
"""
sc = SparkContext._active_spark_context
- return Column(sc._jvm.functions.monotonicallyIncreasingId())
+ return Column(sc._jvm.functions.monotonically_increasing_id())
+
+
+@since(1.6)
+def nanvl(col1, col2):
+ """Returns col1 if it is not NaN, or col2 if col1 is NaN.
+
+ Both inputs should be floating point columns (DoubleType or FloatType).
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.nanvl(_to_java_column(col1), _to_java_column(col2)))
@since(1.4)
@@ -382,15 +424,23 @@ def shiftRightUnsigned(col, numBits):
@since(1.4)
def sparkPartitionId():
+ """
+ .. note:: Deprecated in 1.6, use spark_partition_id instead.
+ """
+ return spark_partition_id()
+
+
+@since(1.6)
+def spark_partition_id():
"""A column for partition ID of the Spark task.
Note that this is indeterministic because it depends on data partitioning and task scheduling.
- >>> df.repartition(1).select(sparkPartitionId().alias("pid")).collect()
+ >>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
"""
sc = SparkContext._active_spark_context
- return Column(sc._jvm.functions.sparkPartitionId())
+ return Column(sc._jvm.functions.spark_partition_id())
@since(1.5)
@@ -1410,6 +1460,33 @@ def explode(col):
return Column(jc)
+@since(1.6)
+def get_json_object(col, path):
+ """
+ Extracts json object from a json string based on json path specified, and returns json string
+ of the extracted json object. It will return null if the input json string is invalid.
+
+ :param col: string column in json format
+ :param path: path to the json object to extract
+ """
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.get_json_object(_to_java_column(col), path)
+ return Column(jc)
+
+
+@since(1.6)
+def json_tuple(col, fields):
+ """Creates a new row for a json column according to the given field names.
+
+ :param col: string column in json format
+ :param fields: list of fields to extract
+
+ """
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.json_tuple(_to_java_column(col), fields)
+ return Column(jc)
+
+
@since(1.5)
def size(col):
"""