aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-01 12:49:02 -0700
committerXiangrui Meng <meng@databricks.com>2015-05-01 12:49:02 -0700
commit37537760d19eab878a5e1a48641cc49e6cb4b989 (patch)
treea37c553f7c27835399dd46a26fafd8dcc1613437 /python
parent16860327286bc08b4e2283d51b4c8fe024ba5006 (diff)
downloadspark-37537760d19eab878a5e1a48641cc49e6cb4b989.tar.gz
spark-37537760d19eab878a5e1a48641cc49e6cb4b989.tar.bz2
spark-37537760d19eab878a5e1a48641cc49e6cb4b989.zip
[SPARK-7274] [SQL] Create Column expression for array/struct creation.
Author: Reynold Xin <rxin@databricks.com> Closes #5802 from rxin/SPARK-7274 and squashes the following commits: 19aecaa [Reynold Xin] Fixed unicode tests. bfc1538 [Reynold Xin] Export all Python functions. 2517b8c [Reynold Xin] Code review. 23da335 [Reynold Xin] Fixed Python bug. 132002e [Reynold Xin] Fixed tests. 56fce26 [Reynold Xin] Added Python support. b0d591a [Reynold Xin] Fixed debug error. 86926a6 [Reynold Xin] Added test suite. 7dbb9ab [Reynold Xin] Ok one more. 470e2f5 [Reynold Xin] One more MLlib ... e2d14f0 [Reynold Xin] [SPARK-7274][SQL] Create Column expression for array/struct creation.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/functions.py80
1 files changed, 61 insertions, 19 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 241f821757..641220a264 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -24,13 +24,20 @@ if sys.version < "3":
from itertools import imap as map
from pyspark import SparkContext
-from pyspark.rdd import _prepare_for_python_RDD
+from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.sql.types import StringType
from pyspark.sql.dataframe import Column, _to_java_column, _to_seq
-__all__ = ['countDistinct', 'approxCountDistinct', 'udf']
+__all__ = [
+ 'approxCountDistinct',
+ 'countDistinct',
+ 'monotonicallyIncreasingId',
+ 'rand',
+ 'randn',
+ 'sparkPartitionId',
+ 'udf']
def _create_function(name, doc=""):
@@ -74,27 +81,21 @@ __all__ += _functions.keys()
__all__.sort()
-def rand(seed=None):
- """
- Generate a random column with i.i.d. samples from U[0.0, 1.0].
- """
- sc = SparkContext._active_spark_context
- if seed:
- jc = sc._jvm.functions.rand(seed)
- else:
- jc = sc._jvm.functions.rand()
- return Column(jc)
+def array(*cols):
+ """Creates a new array column.
+ :param cols: list of column names (string) or list of :class:`Column` expressions that have
+ the same data type.
-def randn(seed=None):
- """
- Generate a column with i.i.d. samples from the standard normal distribution.
+ >>> df.select(array('age', 'age').alias("arr")).collect()
+ [Row(arr=[2, 2]), Row(arr=[5, 5])]
+ >>> df.select(array([df.age, df.age]).alias("arr")).collect()
+ [Row(arr=[2, 2]), Row(arr=[5, 5])]
"""
sc = SparkContext._active_spark_context
- if seed:
- jc = sc._jvm.functions.randn(seed)
- else:
- jc = sc._jvm.functions.randn()
+ if len(cols) == 1 and isinstance(cols[0], (list, set)):
+ cols = cols[0]
+ jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column))
return Column(jc)
@@ -146,6 +147,28 @@ def monotonicallyIncreasingId():
return Column(sc._jvm.functions.monotonicallyIncreasingId())
+def rand(seed=None):
+ """Generates a random column with i.i.d. samples from U[0.0, 1.0].
+ """
+ sc = SparkContext._active_spark_context
+ if seed:
+ jc = sc._jvm.functions.rand(seed)
+ else:
+ jc = sc._jvm.functions.rand()
+ return Column(jc)
+
+
+def randn(seed=None):
+ """Generates a column with i.i.d. samples from the standard normal distribution.
+ """
+ sc = SparkContext._active_spark_context
+ if seed:
+ jc = sc._jvm.functions.randn(seed)
+ else:
+ jc = sc._jvm.functions.randn()
+ return Column(jc)
+
+
def sparkPartitionId():
"""A column for partition ID of the Spark task.
@@ -158,6 +181,25 @@ def sparkPartitionId():
return Column(sc._jvm.functions.sparkPartitionId())
+@ignore_unicode_prefix
+def struct(*cols):
+ """Creates a new struct column.
+
+ :param cols: list of column names (string) or list of :class:`Column` expressions
+ that are named or aliased.
+
+ >>> df.select(struct('age', 'name').alias("struct")).collect()
+ [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
+ >>> df.select(struct([df.age, df.name]).alias("struct")).collect()
+ [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
+ """
+ sc = SparkContext._active_spark_context
+ if len(cols) == 1 and isinstance(cols[0], (list, set)):
+ cols = cols[0]
+ jc = sc._jvm.functions.struct(_to_seq(sc, cols, _to_java_column))
+ return Column(jc)
+
+
class UserDefinedFunction(object):
"""
User defined function in Python