From ca55dc95b777d96b27d4e4c0457dd25145dcd6e9 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 26 Apr 2015 11:46:58 -0700 Subject: [SPARK-7152][SQL] Add a Column expression for partition ID. Author: Reynold Xin Closes #5705 from rxin/df-pid and squashes the following commits: 401018f [Reynold Xin] [SPARK-7152][SQL] Add a Column expression for partition ID. --- python/pyspark/sql/functions.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) (limited to 'python') diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index bb47923f24..f48b7b5d10 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -75,6 +75,20 @@ __all__ += _functions.keys() __all__.sort() +def approxCountDistinct(col, rsd=None): + """Returns a new :class:`Column` for approximate distinct count of ``col``. + + >>> df.agg(approxCountDistinct(df.age).alias('c')).collect() + [Row(c=2)] + """ + sc = SparkContext._active_spark_context + if rsd is None: + jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col)) + else: + jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd) + return Column(jc) + + def countDistinct(col, *cols): """Returns a new :class:`Column` for distinct count of ``col`` or ``cols``. @@ -89,18 +103,16 @@ def countDistinct(col, *cols): return Column(jc) -def approxCountDistinct(col, rsd=None): - """Returns a new :class:`Column` for approximate distinct count of ``col``. +def sparkPartitionId(): + """Returns a column for partition ID of the Spark task. - >>> df.agg(approxCountDistinct(df.age).alias('c')).collect() - [Row(c=2)] + Note that this is indeterministic because it depends on data partitioning and task scheduling. + + >>> df.repartition(1).select(sparkPartitionId().alias("pid")).collect() + [Row(pid=0), Row(pid=0)] """ sc = SparkContext._active_spark_context - if rsd is None: - jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col)) - else: - jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd) - return Column(jc) + return Column(sc._jvm.functions.sparkPartitionId()) class UserDefinedFunction(object): -- cgit v1.2.3