aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-26 11:46:58 -0700
committerReynold Xin <rxin@databricks.com>2015-04-26 11:46:58 -0700
commitca55dc95b777d96b27d4e4c0457dd25145dcd6e9 (patch)
tree391b1d42a227ec3744b4ff67852c01c4ec2e1b71 /python/pyspark
parent9a5bbe05fc1b1141e139d32661821fef47d7a13c (diff)
downloadspark-ca55dc95b777d96b27d4e4c0457dd25145dcd6e9.tar.gz
spark-ca55dc95b777d96b27d4e4c0457dd25145dcd6e9.tar.bz2
spark-ca55dc95b777d96b27d4e4c0457dd25145dcd6e9.zip
[SPARK-7152][SQL] Add a Column expression for partition ID.
Author: Reynold Xin <rxin@databricks.com> Closes #5705 from rxin/df-pid and squashes the following commits: 401018f [Reynold Xin] [SPARK-7152][SQL] Add a Column expression for partition ID.
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/sql/functions.py30
1 files changed, 21 insertions, 9 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index bb47923f24..f48b7b5d10 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -75,6 +75,20 @@ __all__ += _functions.keys()
__all__.sort()
+def approxCountDistinct(col, rsd=None):
+ """Returns a new :class:`Column` for approximate distinct count of ``col``.
+
+ >>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
+ [Row(c=2)]
+ """
+ sc = SparkContext._active_spark_context
+ if rsd is None:
+ jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col))
+ else:
+ jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd)
+ return Column(jc)
+
+
def countDistinct(col, *cols):
"""Returns a new :class:`Column` for distinct count of ``col`` or ``cols``.
@@ -89,18 +103,16 @@ def countDistinct(col, *cols):
return Column(jc)
-def approxCountDistinct(col, rsd=None):
- """Returns a new :class:`Column` for approximate distinct count of ``col``.
+def sparkPartitionId():
+ """Returns a column for partition ID of the Spark task.
- >>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
- [Row(c=2)]
+ Note that this is indeterministic because it depends on data partitioning and task scheduling.
+
+ >>> df.repartition(1).select(sparkPartitionId().alias("pid")).collect()
+ [Row(pid=0), Row(pid=0)]
"""
sc = SparkContext._active_spark_context
- if rsd is None:
- jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col))
- else:
- jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd)
- return Column(jc)
+ return Column(sc._jvm.functions.sparkPartitionId())
class UserDefinedFunction(object):