aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r--python/pyspark/sql/functions.py80
1 files changed, 61 insertions, 19 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 241f821757..641220a264 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -24,13 +24,20 @@ if sys.version < "3":
from itertools import imap as map
from pyspark import SparkContext
-from pyspark.rdd import _prepare_for_python_RDD
+from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.sql.types import StringType
from pyspark.sql.dataframe import Column, _to_java_column, _to_seq
-__all__ = ['countDistinct', 'approxCountDistinct', 'udf']
+__all__ = [
+ 'approxCountDistinct',
+ 'countDistinct',
+ 'monotonicallyIncreasingId',
+ 'rand',
+ 'randn',
+ 'sparkPartitionId',
+ 'udf']
def _create_function(name, doc=""):
@@ -74,27 +81,21 @@ __all__ += _functions.keys()
__all__.sort()
-def rand(seed=None):
- """
- Generate a random column with i.i.d. samples from U[0.0, 1.0].
- """
- sc = SparkContext._active_spark_context
- if seed:
- jc = sc._jvm.functions.rand(seed)
- else:
- jc = sc._jvm.functions.rand()
- return Column(jc)
+def array(*cols):
+ """Creates a new array column.
+ :param cols: list of column names (string) or list of :class:`Column` expressions that have
+ the same data type.
-def randn(seed=None):
- """
- Generate a column with i.i.d. samples from the standard normal distribution.
+ >>> df.select(array('age', 'age').alias("arr")).collect()
+ [Row(arr=[2, 2]), Row(arr=[5, 5])]
+ >>> df.select(array([df.age, df.age]).alias("arr")).collect()
+ [Row(arr=[2, 2]), Row(arr=[5, 5])]
"""
sc = SparkContext._active_spark_context
- if seed:
- jc = sc._jvm.functions.randn(seed)
- else:
- jc = sc._jvm.functions.randn()
+ if len(cols) == 1 and isinstance(cols[0], (list, set)):
+ cols = cols[0]
+ jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column))
return Column(jc)
@@ -146,6 +147,28 @@ def monotonicallyIncreasingId():
return Column(sc._jvm.functions.monotonicallyIncreasingId())
+def rand(seed=None):
+ """Generates a random column with i.i.d. samples from U[0.0, 1.0].
+ """
+ sc = SparkContext._active_spark_context
+ if seed:
+ jc = sc._jvm.functions.rand(seed)
+ else:
+ jc = sc._jvm.functions.rand()
+ return Column(jc)
+
+
+def randn(seed=None):
+ """Generates a column with i.i.d. samples from the standard normal distribution.
+ """
+ sc = SparkContext._active_spark_context
+ if seed:
+ jc = sc._jvm.functions.randn(seed)
+ else:
+ jc = sc._jvm.functions.randn()
+ return Column(jc)
+
+
def sparkPartitionId():
"""A column for partition ID of the Spark task.
@@ -158,6 +181,25 @@ def sparkPartitionId():
return Column(sc._jvm.functions.sparkPartitionId())
+@ignore_unicode_prefix
+def struct(*cols):
+ """Creates a new struct column.
+
+ :param cols: list of column names (string) or list of :class:`Column` expressions
+ that are named or aliased.
+
+ >>> df.select(struct('age', 'name').alias("struct")).collect()
+ [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
+ >>> df.select(struct([df.age, df.name]).alias("struct")).collect()
+ [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
+ """
+ sc = SparkContext._active_spark_context
+ if len(cols) == 1 and isinstance(cols[0], (list, set)):
+ cols = cols[0]
+ jc = sc._jvm.functions.struct(_to_seq(sc, cols, _to_java_column))
+ return Column(jc)
+
+
class UserDefinedFunction(object):
"""
User defined function in Python