aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/readwriter.py20
-rw-r--r--python/pyspark/sql/tests.py7
2 files changed, 24 insertions, 3 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 73105f881b..9208a527d2 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -497,6 +497,26 @@ class DataFrameWriter(object):
self._jwrite = self._jwrite.mode(saveMode)
return self
+ @since(2.0)
+ def outputMode(self, outputMode):
+ """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+
+ Options include:
+
+ * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to
+ the sink
+ * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
+ every time these is some updates
+
+ .. note:: Experimental.
+
+ >>> writer = sdf.write.outputMode('append')
+ """
+ if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
+ raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
+ self._jwrite = self._jwrite.outputMode(outputMode)
+ return self
+
@since(1.4)
def format(self, source):
"""Specifies the underlying output data source.
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1790432edd..0d9dd5ea2a 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -926,7 +926,7 @@ class SQLTests(ReusedPySparkTestCase):
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
cq = df.write.option('checkpointLocation', chk).queryName('this_query') \
- .format('parquet').option('path', out).startStream()
+ .format('parquet').outputMode('append').option('path', out).startStream()
try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)
@@ -952,8 +952,9 @@ class SQLTests(ReusedPySparkTestCase):
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \
- .queryName('fake_query').startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ .queryName('fake_query').outputMode('append') \
+ .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
+
try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)