aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-01-10 17:58:11 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-01-10 17:58:11 -0800
commitbc6c56e940fe93591a1e5ba45751f1b243b57e28 (patch)
treea1cdea8a10912863fbdd6c8b7fe50951e57bf78d /python
parent856bae6af64982ae0221948c58ff564887e54a70 (diff)
downloadspark-bc6c56e940fe93591a1e5ba45751f1b243b57e28.tar.gz
spark-bc6c56e940fe93591a1e5ba45751f1b243b57e28.tar.bz2
spark-bc6c56e940fe93591a1e5ba45751f1b243b57e28.zip
[SPARK-19140][SS] Allow update mode for non-aggregation streaming queries
## What changes were proposed in this pull request? This PR allow update mode for non-aggregation streaming queries. It will be same as the append mode if a query has no aggregations. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16520 from zsxwing/update-without-agg.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/streaming.py27
1 files changed, 19 insertions, 8 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 5014299ad2..a10b185cd4 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -665,6 +665,9 @@ class DataStreamWriter(object):
the sink
* `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
every time these is some updates
+ * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
+ written to the sink every time there are some updates. If the query doesn't contain
+ aggregations, it will be equivalent to `append` mode.
.. note:: Experimental.
@@ -768,7 +771,8 @@ class DataStreamWriter(object):
@ignore_unicode_prefix
@since(2.0)
- def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
+ def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None,
+ **options):
"""Streams the contents of the :class:`DataFrame` to a data source.
The data source is specified by the ``format`` and a set of ``options``.
@@ -779,15 +783,20 @@ class DataStreamWriter(object):
:param path: the path in a Hadoop supported file system
:param format: the format used to save
-
- * ``append``: Append contents of this :class:`DataFrame` to existing data.
- * ``overwrite``: Overwrite existing data.
- * ``ignore``: Silently ignore this operation if data already exists.
- * ``error`` (default case): Throw an exception if data already exists.
+ :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a
+ streaming sink.
+
+ * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the
+ sink
+ * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
+ every time these is some updates
+ * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
+ written to the sink every time there are some updates. If the query doesn't contain
+ aggregations, it will be equivalent to `append` mode.
:param partitionBy: names of partitioning columns
:param queryName: unique name for the query
:param options: All other string options. You may want to provide a `checkpointLocation`
- for most streams, however it is not required for a `memory` stream.
+ for most streams, however it is not required for a `memory` stream.
>>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
>>> sq.isActive
@@ -798,7 +807,7 @@ class DataStreamWriter(object):
>>> sq.isActive
False
>>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
- ... queryName='that_query', format='memory')
+ ... queryName='that_query', outputMode="append", format='memory')
>>> sq.name
u'that_query'
>>> sq.isActive
@@ -806,6 +815,8 @@ class DataStreamWriter(object):
>>> sq.stop()
"""
self.options(**options)
+ if outputMode is not None:
+ self.outputMode(outputMode)
if partitionBy is not None:
self.partitionBy(partitionBy)
if format is not None: