aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-28 00:39:08 -0700
committerReynold Xin <rxin@databricks.com>2015-04-28 00:39:08 -0700
commitd94cd1a733d5715792e6c4eac87f0d5c81aebbe2 (patch)
tree79757d52491eaf74b527a4b280d4762ab21dae20 /python
parentbf35edd9d4b8b11df9f47b6ff43831bc95f06322 (diff)
downloadspark-d94cd1a733d5715792e6c4eac87f0d5c81aebbe2.tar.gz
spark-d94cd1a733d5715792e6c4eac87f0d5c81aebbe2.tar.bz2
spark-d94cd1a733d5715792e6c4eac87f0d5c81aebbe2.zip
[SPARK-7135][SQL] DataFrame expression for monotonically increasing IDs.
Author: Reynold Xin <rxin@databricks.com> Closes #5709 from rxin/inc-id and squashes the following commits: 7853611 [Reynold Xin] private sql. a9fda0d [Reynold Xin] Missed a few numbers. 343d896 [Reynold Xin] Self review feedback. a7136cb [Reynold Xin] [SPARK-7135][SQL] DataFrame expression for monotonically increasing IDs.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/functions.py22
1 files changed, 21 insertions, 1 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index f48b7b5d10..7b86655d9c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -103,8 +103,28 @@ def countDistinct(col, *cols):
return Column(jc)
+def monotonicallyIncreasingId():
+ """A column that generates monotonically increasing 64-bit integers.
+
+ The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
+ The current implementation puts the partition ID in the upper 31 bits, and the record number
+ within each partition in the lower 33 bits. The assumption is that the data frame has
+ less than 1 billion partitions, and each partition has less than 8 billion records.
+
+ As an example, consider a [[DataFrame]] with two partitions, each with 3 records.
+ This expression would return the following IDs:
+ 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
+
+ >>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
+ >>> df0.select(monotonicallyIncreasingId().alias('id')).collect()
+ [Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.monotonicallyIncreasingId())
+
+
def sparkPartitionId():
- """Returns a column for partition ID of the Spark task.
+ """A column for partition ID of the Spark task.
Note that this is indeterministic because it depends on data partitioning and task scheduling.