diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2017-01-10 17:58:11 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-01-10 17:58:11 -0800 |
commit | bc6c56e940fe93591a1e5ba45751f1b243b57e28 (patch) | |
tree | a1cdea8a10912863fbdd6c8b7fe50951e57bf78d /python/pyspark/sql | |
parent | 856bae6af64982ae0221948c58ff564887e54a70 (diff) | |
download | spark-bc6c56e940fe93591a1e5ba45751f1b243b57e28.tar.gz spark-bc6c56e940fe93591a1e5ba45751f1b243b57e28.tar.bz2 spark-bc6c56e940fe93591a1e5ba45751f1b243b57e28.zip |
[SPARK-19140][SS] Allow update mode for non-aggregation streaming queries
## What changes were proposed in this pull request?
This PR allow update mode for non-aggregation streaming queries. It will be same as the append mode if a query has no aggregations.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16520 from zsxwing/update-without-agg.
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r-- | python/pyspark/sql/streaming.py | 27 |
1 files changed, 19 insertions, 8 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 5014299ad2..a10b185cd4 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -665,6 +665,9 @@ class DataStreamWriter(object): the sink * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink every time these is some updates + * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be + written to the sink every time there are some updates. If the query doesn't contain + aggregations, it will be equivalent to `append` mode. .. note:: Experimental. @@ -768,7 +771,8 @@ class DataStreamWriter(object): @ignore_unicode_prefix @since(2.0) - def start(self, path=None, format=None, partitionBy=None, queryName=None, **options): + def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None, + **options): """Streams the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. @@ -779,15 +783,20 @@ class DataStreamWriter(object): :param path: the path in a Hadoop supported file system :param format: the format used to save - - * ``append``: Append contents of this :class:`DataFrame` to existing data. - * ``overwrite``: Overwrite existing data. - * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. + :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a + streaming sink. + + * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the + sink + * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink + every time these is some updates + * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be + written to the sink every time there are some updates. If the query doesn't contain + aggregations, it will be equivalent to `append` mode. :param partitionBy: names of partitioning columns :param queryName: unique name for the query :param options: All other string options. You may want to provide a `checkpointLocation` - for most streams, however it is not required for a `memory` stream. + for most streams, however it is not required for a `memory` stream. >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sq.isActive @@ -798,7 +807,7 @@ class DataStreamWriter(object): >>> sq.isActive False >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( - ... queryName='that_query', format='memory') + ... queryName='that_query', outputMode="append", format='memory') >>> sq.name u'that_query' >>> sq.isActive @@ -806,6 +815,8 @@ class DataStreamWriter(object): >>> sq.stop() """ self.options(**options) + if outputMode is not None: + self.outputMode(outputMode) if partitionBy is not None: self.partitionBy(partitionBy) if format is not None: |