diff options
author | Reynold Xin <rxin@databricks.com> | 2015-05-01 12:49:02 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-05-01 12:49:02 -0700 |
commit | 37537760d19eab878a5e1a48641cc49e6cb4b989 (patch) | |
tree | a37c553f7c27835399dd46a26fafd8dcc1613437 /python/pyspark/sql | |
parent | 16860327286bc08b4e2283d51b4c8fe024ba5006 (diff) | |
download | spark-37537760d19eab878a5e1a48641cc49e6cb4b989.tar.gz spark-37537760d19eab878a5e1a48641cc49e6cb4b989.tar.bz2 spark-37537760d19eab878a5e1a48641cc49e6cb4b989.zip |
[SPARK-7274] [SQL] Create Column expression for array/struct creation.
Author: Reynold Xin <rxin@databricks.com>
Closes #5802 from rxin/SPARK-7274 and squashes the following commits:
19aecaa [Reynold Xin] Fixed unicode tests.
bfc1538 [Reynold Xin] Export all Python functions.
2517b8c [Reynold Xin] Code review.
23da335 [Reynold Xin] Fixed Python bug.
132002e [Reynold Xin] Fixed tests.
56fce26 [Reynold Xin] Added Python support.
b0d591a [Reynold Xin] Fixed debug error.
86926a6 [Reynold Xin] Added test suite.
7dbb9ab [Reynold Xin] Ok one more.
470e2f5 [Reynold Xin] One more MLlib ...
e2d14f0 [Reynold Xin] [SPARK-7274][SQL] Create Column expression for array/struct creation.
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r-- | python/pyspark/sql/functions.py | 80 |
1 files changed, 61 insertions, 19 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 241f821757..641220a264 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -24,13 +24,20 @@ if sys.version < "3": from itertools import imap as map from pyspark import SparkContext -from pyspark.rdd import _prepare_for_python_RDD +from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.sql.types import StringType from pyspark.sql.dataframe import Column, _to_java_column, _to_seq -__all__ = ['countDistinct', 'approxCountDistinct', 'udf'] +__all__ = [ + 'approxCountDistinct', + 'countDistinct', + 'monotonicallyIncreasingId', + 'rand', + 'randn', + 'sparkPartitionId', + 'udf'] def _create_function(name, doc=""): @@ -74,27 +81,21 @@ __all__ += _functions.keys() __all__.sort() -def rand(seed=None): - """ - Generate a random column with i.i.d. samples from U[0.0, 1.0]. - """ - sc = SparkContext._active_spark_context - if seed: - jc = sc._jvm.functions.rand(seed) - else: - jc = sc._jvm.functions.rand() - return Column(jc) +def array(*cols): + """Creates a new array column. + :param cols: list of column names (string) or list of :class:`Column` expressions that have + the same data type. -def randn(seed=None): - """ - Generate a column with i.i.d. samples from the standard normal distribution. + >>> df.select(array('age', 'age').alias("arr")).collect() + [Row(arr=[2, 2]), Row(arr=[5, 5])] + >>> df.select(array([df.age, df.age]).alias("arr")).collect() + [Row(arr=[2, 2]), Row(arr=[5, 5])] """ sc = SparkContext._active_spark_context - if seed: - jc = sc._jvm.functions.randn(seed) - else: - jc = sc._jvm.functions.randn() + if len(cols) == 1 and isinstance(cols[0], (list, set)): + cols = cols[0] + jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column)) return Column(jc) @@ -146,6 +147,28 @@ def monotonicallyIncreasingId(): return Column(sc._jvm.functions.monotonicallyIncreasingId()) +def rand(seed=None): + """Generates a random column with i.i.d. samples from U[0.0, 1.0]. + """ + sc = SparkContext._active_spark_context + if seed: + jc = sc._jvm.functions.rand(seed) + else: + jc = sc._jvm.functions.rand() + return Column(jc) + + +def randn(seed=None): + """Generates a column with i.i.d. samples from the standard normal distribution. + """ + sc = SparkContext._active_spark_context + if seed: + jc = sc._jvm.functions.randn(seed) + else: + jc = sc._jvm.functions.randn() + return Column(jc) + + def sparkPartitionId(): """A column for partition ID of the Spark task. @@ -158,6 +181,25 @@ def sparkPartitionId(): return Column(sc._jvm.functions.sparkPartitionId()) +@ignore_unicode_prefix +def struct(*cols): + """Creates a new struct column. + + :param cols: list of column names (string) or list of :class:`Column` expressions + that are named or aliased. + + >>> df.select(struct('age', 'name').alias("struct")).collect() + [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))] + >>> df.select(struct([df.age, df.name]).alias("struct")).collect() + [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))] + """ + sc = SparkContext._active_spark_context + if len(cols) == 1 and isinstance(cols[0], (list, set)): + cols = cols[0] + jc = sc._jvm.functions.struct(_to_seq(sc, cols, _to_java_column)) + return Column(jc) + + class UserDefinedFunction(object): """ User defined function in Python |