diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2017-02-23 11:25:39 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2017-02-23 11:25:39 -0800 |
commit | 9bf4e2baad0e2851da554d85223ffaa029cfa490 (patch) | |
tree | a08773e6a82e7d5fa78ca2f71d707e74be36a9cc /python/pyspark | |
parent | 7bf09433f5c5e08154ba106be21fe24f17cd282b (diff) | |
download | spark-9bf4e2baad0e2851da554d85223ffaa029cfa490.tar.gz spark-9bf4e2baad0e2851da554d85223ffaa029cfa490.tar.bz2 spark-9bf4e2baad0e2851da554d85223ffaa029cfa490.zip |
[SPARK-19497][SS] Implement streaming deduplication
## What changes were proposed in this pull request?
This PR adds a special streaming deduplication operator to support `dropDuplicates` with `aggregation` and watermark. It reuses the `dropDuplicates` API but creates new logical plan `Deduplication` and new physical plan `DeduplicationExec`.
The following cases are supported:
- one or multiple `dropDuplicates()` without aggregation (with or without watermark)
- `dropDuplicates` before aggregation
Not supported cases:
- `dropDuplicates` after aggregation
Breaking changes:
- `dropDuplicates` without aggregation doesn't work with `complete` or `update` mode.
## How was this patch tested?
The new unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16970 from zsxwing/dedup.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/sql/dataframe.py | 6 |
1 files changed, 6 insertions, 0 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 70efeaf016..bb6df22682 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1158,6 +1158,12 @@ class DataFrame(object): """Return a new :class:`DataFrame` with duplicate rows removed, optionally only considering certain columns. + For a static batch :class:`DataFrame`, it just drops duplicate rows. For a streaming + :class:`DataFrame`, it will keep all data across triggers as intermediate state to drop + duplicates rows. You can use :func:`withWatermark` to limit how late the duplicate data can + be and system will accordingly limit the state. In addition, too late data older than + watermark will be dropped to avoid any possibility of duplicates. + :func:`drop_duplicates` is an alias for :func:`dropDuplicates`. >>> from pyspark.sql import Row |