aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-23 11:25:39 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2017-02-23 11:25:39 -0800
commit9bf4e2baad0e2851da554d85223ffaa029cfa490 (patch)
treea08773e6a82e7d5fa78ca2f71d707e74be36a9cc /python/pyspark
parent7bf09433f5c5e08154ba106be21fe24f17cd282b (diff)
downloadspark-9bf4e2baad0e2851da554d85223ffaa029cfa490.tar.gz
spark-9bf4e2baad0e2851da554d85223ffaa029cfa490.tar.bz2
spark-9bf4e2baad0e2851da554d85223ffaa029cfa490.zip
[SPARK-19497][SS] Implement streaming deduplication
## What changes were proposed in this pull request? This PR adds a special streaming deduplication operator to support `dropDuplicates` with `aggregation` and watermark. It reuses the `dropDuplicates` API but creates new logical plan `Deduplication` and new physical plan `DeduplicationExec`. The following cases are supported: - one or multiple `dropDuplicates()` without aggregation (with or without watermark) - `dropDuplicates` before aggregation Not supported cases: - `dropDuplicates` after aggregation Breaking changes: - `dropDuplicates` without aggregation doesn't work with `complete` or `update` mode. ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16970 from zsxwing/dedup.
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/sql/dataframe.py6
1 files changed, 6 insertions, 0 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 70efeaf016..bb6df22682 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1158,6 +1158,12 @@ class DataFrame(object):
"""Return a new :class:`DataFrame` with duplicate rows removed,
optionally only considering certain columns.
+ For a static batch :class:`DataFrame`, it just drops duplicate rows. For a streaming
+ :class:`DataFrame`, it will keep all data across triggers as intermediate state to drop
+ duplicates rows. You can use :func:`withWatermark` to limit how late the duplicate data can
+ be and system will accordingly limit the state. In addition, too late data older than
+ watermark will be dropped to avoid any possibility of duplicates.
+
:func:`drop_duplicates` is an alias for :func:`dropDuplicates`.
>>> from pyspark.sql import Row