diff options
author | Reynold Xin <rxin@databricks.com> | 2015-04-26 11:46:58 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-26 11:46:58 -0700 |
commit | ca55dc95b777d96b27d4e4c0457dd25145dcd6e9 (patch) | |
tree | 391b1d42a227ec3744b4ff67852c01c4ec2e1b71 /python/pyspark/sql | |
parent | 9a5bbe05fc1b1141e139d32661821fef47d7a13c (diff) | |
download | spark-ca55dc95b777d96b27d4e4c0457dd25145dcd6e9.tar.gz spark-ca55dc95b777d96b27d4e4c0457dd25145dcd6e9.tar.bz2 spark-ca55dc95b777d96b27d4e4c0457dd25145dcd6e9.zip |
[SPARK-7152][SQL] Add a Column expression for partition ID.
Author: Reynold Xin <rxin@databricks.com>
Closes #5705 from rxin/df-pid and squashes the following commits:
401018f [Reynold Xin] [SPARK-7152][SQL] Add a Column expression for partition ID.
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r-- | python/pyspark/sql/functions.py | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index bb47923f24..f48b7b5d10 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -75,6 +75,20 @@ __all__ += _functions.keys() __all__.sort() +def approxCountDistinct(col, rsd=None): + """Returns a new :class:`Column` for approximate distinct count of ``col``. + + >>> df.agg(approxCountDistinct(df.age).alias('c')).collect() + [Row(c=2)] + """ + sc = SparkContext._active_spark_context + if rsd is None: + jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col)) + else: + jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd) + return Column(jc) + + def countDistinct(col, *cols): """Returns a new :class:`Column` for distinct count of ``col`` or ``cols``. @@ -89,18 +103,16 @@ def countDistinct(col, *cols): return Column(jc) -def approxCountDistinct(col, rsd=None): - """Returns a new :class:`Column` for approximate distinct count of ``col``. +def sparkPartitionId(): + """Returns a column for partition ID of the Spark task. - >>> df.agg(approxCountDistinct(df.age).alias('c')).collect() - [Row(c=2)] + Note that this is indeterministic because it depends on data partitioning and task scheduling. + + >>> df.repartition(1).select(sparkPartitionId().alias("pid")).collect() + [Row(pid=0), Row(pid=0)] """ sc = SparkContext._active_spark_context - if rsd is None: - jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col)) - else: - jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd) - return Column(jc) + return Column(sc._jvm.functions.sparkPartitionId()) class UserDefinedFunction(object): |