aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/sql/functions.py111
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala124
2 files changed, 196 insertions, 39 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index a1ca723bbd..e3786e0fa5 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -150,18 +150,18 @@ _binary_mathfunctions = {
_window_functions = {
'rowNumber':
- """returns a sequential number starting at 1 within a window partition.
-
- This is equivalent to the ROW_NUMBER function in SQL.""",
+ """.. note:: Deprecated in 1.6, use row_number instead.""",
+ 'row_number':
+ """returns a sequential number starting at 1 within a window partition.""",
'denseRank':
+ """.. note:: Deprecated in 1.6, use dense_rank instead.""",
+ 'dense_rank':
"""returns the rank of rows within a window partition, without any gaps.
The difference between rank and denseRank is that denseRank leaves no gaps in ranking
sequence when there are ties. That is, if you were ranking a competition using denseRank
and had three people tie for second place, you would say that all three were in second
- place and that the next person came in third.
-
- This is equivalent to the DENSE_RANK function in SQL.""",
+ place and that the next person came in third.""",
'rank':
"""returns the rank of rows within a window partition.
@@ -172,14 +172,14 @@ _window_functions = {
This is equivalent to the RANK function in SQL.""",
'cumeDist':
+ """.. note:: Deprecated in 1.6, use cume_dist instead.""",
+ 'cume_dist':
"""returns the cumulative distribution of values within a window partition,
- i.e. the fraction of rows that are below the current row.
-
- This is equivalent to the CUME_DIST function in SQL.""",
+ i.e. the fraction of rows that are below the current row.""",
'percentRank':
- """returns the relative rank (i.e. percentile) of rows within a window partition.
-
- This is equivalent to the PERCENT_RANK function in SQL.""",
+ """.. note:: Deprecated in 1.6, use percent_rank instead.""",
+ 'percent_rank':
+ """returns the relative rank (i.e. percentile) of rows within a window partition.""",
}
for _name, _doc in _functions.items():
@@ -189,7 +189,7 @@ for _name, _doc in _functions_1_4.items():
for _name, _doc in _binary_mathfunctions.items():
globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc))
for _name, _doc in _window_functions.items():
- globals()[_name] = since(1.4)(_create_window_function(_name, _doc))
+ globals()[_name] = since(1.6)(_create_window_function(_name, _doc))
for _name, _doc in _functions_1_6.items():
globals()[_name] = since(1.6)(_create_function(_name, _doc))
del _name, _doc
@@ -288,6 +288,38 @@ def countDistinct(col, *cols):
@since(1.4)
def monotonicallyIncreasingId():
+ """
+ .. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
+ """
+ return monotonically_increasing_id()
+
+
+@since(1.6)
+def input_file_name():
+ """Creates a string column for the file name of the current Spark task.
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.input_file_name())
+
+
+@since(1.6)
+def isnan(col):
+ """An expression that returns true iff the column is NaN.
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.isnan(_to_java_column(col)))
+
+
+@since(1.6)
+def isnull(col):
+ """An expression that returns true iff the column is null.
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.isnull(_to_java_column(col)))
+
+
+@since(1.6)
+def monotonically_increasing_id():
"""A column that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
@@ -300,11 +332,21 @@ def monotonicallyIncreasingId():
0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
- >>> df0.select(monotonicallyIncreasingId().alias('id')).collect()
+ >>> df0.select(monotonically_increasing_id().alias('id')).collect()
[Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
"""
sc = SparkContext._active_spark_context
- return Column(sc._jvm.functions.monotonicallyIncreasingId())
+ return Column(sc._jvm.functions.monotonically_increasing_id())
+
+
+@since(1.6)
+def nanvl(col1, col2):
+ """Returns col1 if it is not NaN, or col2 if col1 is NaN.
+
+ Both inputs should be floating point columns (DoubleType or FloatType).
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.nanvl(_to_java_column(col1), _to_java_column(col2)))
@since(1.4)
@@ -382,15 +424,23 @@ def shiftRightUnsigned(col, numBits):
@since(1.4)
def sparkPartitionId():
+ """
+ .. note:: Deprecated in 1.6, use spark_partition_id instead.
+ """
+ return spark_partition_id()
+
+
+@since(1.6)
+def spark_partition_id():
"""A column for partition ID of the Spark task.
Note that this is indeterministic because it depends on data partitioning and task scheduling.
- >>> df.repartition(1).select(sparkPartitionId().alias("pid")).collect()
+ >>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
"""
sc = SparkContext._active_spark_context
- return Column(sc._jvm.functions.sparkPartitionId())
+ return Column(sc._jvm.functions.spark_partition_id())
@since(1.5)
@@ -1410,6 +1460,33 @@ def explode(col):
return Column(jc)
+@since(1.6)
+def get_json_object(col, path):
+ """
+ Extracts json object from a json string based on json path specified, and returns json string
+ of the extracted json object. It will return null if the input json string is invalid.
+
+ :param col: string column in json format
+ :param path: path to the json object to extract
+ """
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.get_json_object(_to_java_column(col), path)
+ return Column(jc)
+
+
+@since(1.6)
+def json_tuple(col, fields):
+ """Creates a new row for a json column according to the given field names.
+
+ :param col: string column in json format
+ :param fields: list of fields to extract
+
+ """
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.json_tuple(_to_java_column(col), fields)
+ return Column(jc)
+
+
@since(1.5)
def size(col):
"""
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 77dd5bc725..276c5dfc8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -473,6 +473,13 @@ object functions extends LegacyFunctions {
//////////////////////////////////////////////////////////////////////////////////////////////
/**
+ * @group window_funcs
+ * @deprecated As of 1.6.0, replaced by `cume_dist`. This will be removed in Spark 2.0.
+ */
+ @deprecated("Use cume_dist. This will be removed in Spark 2.0.", "1.6.0")
+ def cumeDist(): Column = cume_dist()
+
+ /**
* Window function: returns the cumulative distribution of values within a window partition,
* i.e. the fraction of rows that are below the current row.
*
@@ -481,13 +488,17 @@ object functions extends LegacyFunctions {
* cumeDist(x) = number of values before (and including) x / N
* }}}
*
- *
- * This is equivalent to the CUME_DIST function in SQL.
- *
* @group window_funcs
- * @since 1.4.0
+ * @since 1.6.0
*/
- def cumeDist(): Column = withExpr { UnresolvedWindowFunction("cume_dist", Nil) }
+ def cume_dist(): Column = withExpr { UnresolvedWindowFunction("cume_dist", Nil) }
+
+ /**
+ * @group window_funcs
+ * @deprecated As of 1.6.0, replaced by `dense_rank`. This will be removed in Spark 2.0.
+ */
+ @deprecated("Use dense_rank. This will be removed in Spark 2.0.", "1.6.0")
+ def denseRank(): Column = dense_rank()
/**
* Window function: returns the rank of rows within a window partition, without any gaps.
@@ -497,12 +508,10 @@ object functions extends LegacyFunctions {
* and had three people tie for second place, you would say that all three were in second
* place and that the next person came in third.
*
- * This is equivalent to the DENSE_RANK function in SQL.
- *
* @group window_funcs
- * @since 1.4.0
+ * @since 1.6.0
*/
- def denseRank(): Column = withExpr { UnresolvedWindowFunction("dense_rank", Nil) }
+ def dense_rank(): Column = withExpr { UnresolvedWindowFunction("dense_rank", Nil) }
/**
* Window function: returns the value that is `offset` rows before the current row, and
@@ -621,6 +630,13 @@ object functions extends LegacyFunctions {
def ntile(n: Int): Column = withExpr { UnresolvedWindowFunction("ntile", lit(n).expr :: Nil) }
/**
+ * @group window_funcs
+ * @deprecated As of 1.6.0, replaced by `percent_rank`. This will be removed in Spark 2.0.
+ */
+ @deprecated("Use percent_rank. This will be removed in Spark 2.0.", "1.6.0")
+ def percentRank(): Column = percent_rank()
+
+ /**
* Window function: returns the relative rank (i.e. percentile) of rows within a window partition.
*
* This is computed by:
@@ -631,9 +647,9 @@ object functions extends LegacyFunctions {
* This is equivalent to the PERCENT_RANK function in SQL.
*
* @group window_funcs
- * @since 1.4.0
+ * @since 1.6.0
*/
- def percentRank(): Column = withExpr { UnresolvedWindowFunction("percent_rank", Nil) }
+ def percent_rank(): Column = withExpr { UnresolvedWindowFunction("percent_rank", Nil) }
/**
* Window function: returns the rank of rows within a window partition.
@@ -651,14 +667,19 @@ object functions extends LegacyFunctions {
def rank(): Column = withExpr { UnresolvedWindowFunction("rank", Nil) }
/**
+ * @group window_funcs
+ * @deprecated As of 1.6.0, replaced by `row_number`. This will be removed in Spark 2.0.
+ */
+ @deprecated("Use row_number. This will be removed in Spark 2.0.", "1.6.0")
+ def rowNumber(): Column = row_number()
+
+ /**
* Window function: returns a sequential number starting at 1 within a window partition.
*
- * This is equivalent to the ROW_NUMBER function in SQL.
- *
* @group window_funcs
- * @since 1.4.0
+ * @since 1.6.0
*/
- def rowNumber(): Column = withExpr { UnresolvedWindowFunction("row_number", Nil) }
+ def row_number(): Column = withExpr { UnresolvedWindowFunction("row_number", Nil) }
//////////////////////////////////////////////////////////////////////////////////////////////
// Non-aggregate functions
@@ -721,19 +742,42 @@ object functions extends LegacyFunctions {
def coalesce(e: Column*): Column = withExpr { Coalesce(e.map(_.expr)) }
/**
+ * @group normal_funcs
+ * @deprecated As of 1.6.0, replaced by `input_file_name`. This will be removed in Spark 2.0.
+ */
+ @deprecated("Use input_file_name. This will be removed in Spark 2.0.", "1.6.0")
+ def inputFileName(): Column = input_file_name()
+
+ /**
* Creates a string column for the file name of the current Spark task.
*
* @group normal_funcs
+ * @since 1.6.0
*/
- def inputFileName(): Column = withExpr { InputFileName() }
+ def input_file_name(): Column = withExpr { InputFileName() }
+
+ /**
+ * @group normal_funcs
+ * @deprecated As of 1.6.0, replaced by `isnan`. This will be removed in Spark 2.0.
+ */
+ @deprecated("Use isnan. This will be removed in Spark 2.0.", "1.6.0")
+ def isNaN(e: Column): Column = isnan(e)
/**
* Return true iff the column is NaN.
*
* @group normal_funcs
- * @since 1.5.0
+ * @since 1.6.0
+ */
+ def isnan(e: Column): Column = withExpr { IsNaN(e.expr) }
+
+ /**
+ * Return true iff the column is null.
+ *
+ * @group normal_funcs
+ * @since 1.6.0
*/
- def isNaN(e: Column): Column = withExpr { IsNaN(e.expr) }
+ def isnull(e: Column): Column = withExpr { IsNull(e.expr) }
/**
* A column expression that generates monotonically increasing 64-bit integers.
@@ -750,7 +794,24 @@ object functions extends LegacyFunctions {
* @group normal_funcs
* @since 1.4.0
*/
- def monotonicallyIncreasingId(): Column = withExpr { MonotonicallyIncreasingID() }
+ def monotonicallyIncreasingId(): Column = monotonically_increasing_id()
+
+ /**
+ * A column expression that generates monotonically increasing 64-bit integers.
+ *
+ * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
+ * The current implementation puts the partition ID in the upper 31 bits, and the record number
+ * within each partition in the lower 33 bits. The assumption is that the data frame has
+ * less than 1 billion partitions, and each partition has less than 8 billion records.
+ *
+ * As an example, consider a [[DataFrame]] with two partitions, each with 3 records.
+ * This expression would return the following IDs:
+ * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
+ *
+ * @group normal_funcs
+ * @since 1.6.0
+ */
+ def monotonically_increasing_id(): Column = withExpr { MonotonicallyIncreasingID() }
/**
* Returns col1 if it is not NaN, or col2 if col1 is NaN.
@@ -826,14 +887,22 @@ object functions extends LegacyFunctions {
def randn(): Column = randn(Utils.random.nextLong)
/**
+ * @group normal_funcs
+ * @since 1.4.0
+ * @deprecated As of 1.6.0, replaced by `spark_partition_id`. This will be removed in Spark 2.0.
+ */
+ @deprecated("Use cume_dist. This will be removed in Spark 2.0.", "1.6.0")
+ def sparkPartitionId(): Column = spark_partition_id()
+
+ /**
* Partition ID of the Spark task.
*
* Note that this is indeterministic because it depends on data partitioning and task scheduling.
*
* @group normal_funcs
- * @since 1.4.0
+ * @since 1.6.0
*/
- def sparkPartitionId(): Column = withExpr { SparkPartitionID() }
+ def spark_partition_id(): Column = withExpr { SparkPartitionID() }
/**
* Computes the square root of the specified float value.
@@ -2306,6 +2375,17 @@ object functions extends LegacyFunctions {
def explode(e: Column): Column = withExpr { Explode(e.expr) }
/**
+ * Extracts json object from a json string based on json path specified, and returns json string
+ * of the extracted json object. It will return null if the input json string is invalid.
+ *
+ * @group collection_funcs
+ * @since 1.6.0
+ */
+ def get_json_object(e: Column, path: String): Column = withExpr {
+ GetJsonObject(e.expr, lit(path).expr)
+ }
+
+ /**
* Creates a new row for a json column according to the given field names.
*
* @group collection_funcs
@@ -2313,7 +2393,7 @@ object functions extends LegacyFunctions {
*/
@scala.annotation.varargs
def json_tuple(json: Column, fields: String*): Column = withExpr {
- require(fields.length > 0, "at least 1 field name should be given.")
+ require(fields.nonEmpty, "at least 1 field name should be given.")
JsonTuple(json.expr +: fields.map(Literal.apply))
}