From 37537760d19eab878a5e1a48641cc49e6cb4b989 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 1 May 2015 12:49:02 -0700 Subject: [SPARK-7274] [SQL] Create Column expression for array/struct creation. Author: Reynold Xin Closes #5802 from rxin/SPARK-7274 and squashes the following commits: 19aecaa [Reynold Xin] Fixed unicode tests. bfc1538 [Reynold Xin] Export all Python functions. 2517b8c [Reynold Xin] Code review. 23da335 [Reynold Xin] Fixed Python bug. 132002e [Reynold Xin] Fixed tests. 56fce26 [Reynold Xin] Added Python support. b0d591a [Reynold Xin] Fixed debug error. 86926a6 [Reynold Xin] Added test suite. 7dbb9ab [Reynold Xin] Ok one more. 470e2f5 [Reynold Xin] One more MLlib ... e2d14f0 [Reynold Xin] [SPARK-7274][SQL] Create Column expression for array/struct creation. --- python/pyspark/sql/functions.py | 80 +++++++++++++++++++++++++++++++---------- 1 file changed, 61 insertions(+), 19 deletions(-) (limited to 'python') diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 241f821757..641220a264 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -24,13 +24,20 @@ if sys.version < "3": from itertools import imap as map from pyspark import SparkContext -from pyspark.rdd import _prepare_for_python_RDD +from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.sql.types import StringType from pyspark.sql.dataframe import Column, _to_java_column, _to_seq -__all__ = ['countDistinct', 'approxCountDistinct', 'udf'] +__all__ = [ + 'approxCountDistinct', + 'countDistinct', + 'monotonicallyIncreasingId', + 'rand', + 'randn', + 'sparkPartitionId', + 'udf'] def _create_function(name, doc=""): @@ -74,27 +81,21 @@ __all__ += _functions.keys() __all__.sort() -def rand(seed=None): - """ - Generate a random column with i.i.d. samples from U[0.0, 1.0]. - """ - sc = SparkContext._active_spark_context - if seed: - jc = sc._jvm.functions.rand(seed) - else: - jc = sc._jvm.functions.rand() - return Column(jc) +def array(*cols): + """Creates a new array column. + :param cols: list of column names (string) or list of :class:`Column` expressions that have + the same data type. -def randn(seed=None): - """ - Generate a column with i.i.d. samples from the standard normal distribution. + >>> df.select(array('age', 'age').alias("arr")).collect() + [Row(arr=[2, 2]), Row(arr=[5, 5])] + >>> df.select(array([df.age, df.age]).alias("arr")).collect() + [Row(arr=[2, 2]), Row(arr=[5, 5])] """ sc = SparkContext._active_spark_context - if seed: - jc = sc._jvm.functions.randn(seed) - else: - jc = sc._jvm.functions.randn() + if len(cols) == 1 and isinstance(cols[0], (list, set)): + cols = cols[0] + jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column)) return Column(jc) @@ -146,6 +147,28 @@ def monotonicallyIncreasingId(): return Column(sc._jvm.functions.monotonicallyIncreasingId()) +def rand(seed=None): + """Generates a random column with i.i.d. samples from U[0.0, 1.0]. + """ + sc = SparkContext._active_spark_context + if seed: + jc = sc._jvm.functions.rand(seed) + else: + jc = sc._jvm.functions.rand() + return Column(jc) + + +def randn(seed=None): + """Generates a column with i.i.d. samples from the standard normal distribution. + """ + sc = SparkContext._active_spark_context + if seed: + jc = sc._jvm.functions.randn(seed) + else: + jc = sc._jvm.functions.randn() + return Column(jc) + + def sparkPartitionId(): """A column for partition ID of the Spark task. @@ -158,6 +181,25 @@ def sparkPartitionId(): return Column(sc._jvm.functions.sparkPartitionId()) +@ignore_unicode_prefix +def struct(*cols): + """Creates a new struct column. + + :param cols: list of column names (string) or list of :class:`Column` expressions + that are named or aliased. + + >>> df.select(struct('age', 'name').alias("struct")).collect() + [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))] + >>> df.select(struct([df.age, df.name]).alias("struct")).collect() + [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))] + """ + sc = SparkContext._active_spark_context + if len(cols) == 1 and isinstance(cols[0], (list, set)): + cols = cols[0] + jc = sc._jvm.functions.struct(_to_seq(sc, cols, _to_java_column)) + return Column(jc) + + class UserDefinedFunction(object): """ User defined function in Python -- cgit v1.2.3