From 90b11439b3d4540f48985e87dcc99749f0369287 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 31 May 2016 15:57:01 -0700 Subject: [SPARK-15517][SQL][STREAMING] Add support for complete output mode in Structure Streaming ## What changes were proposed in this pull request? Currently structured streaming only supports append output mode. This PR adds the following. - Added support for Complete output mode in the internal state store, analyzer and planner. - Added public API in Scala and Python for users to specify output mode - Added checks for unsupported combinations of output mode and DF operations - Plans with no aggregation should support only Append mode - Plans with aggregation should support only Update and Complete modes - Default output mode is Append mode (**Question: should we change this to automatically set to Complete mode when there is aggregation?**) - Added support for Complete output mode in Memory Sink. So Memory Sink internally supports append and complete, update. But from public API only Complete and Append output modes are supported. ## How was this patch tested? Unit tests in various test suites - StreamingAggregationSuite: tests for complete mode - MemorySinkSuite: tests for checking behavior in Append and Complete modes. - UnsupportedOperationSuite: tests for checking unsupported combinations of DF ops and output modes - DataFrameReaderWriterSuite: tests for checking that output mode cannot be called on static DFs - Python doc test and existing unit tests modified to call write.outputMode. Author: Tathagata Das Closes #13286 from tdas/complete-mode. --- python/pyspark/sql/readwriter.py | 20 ++++++++++++++++++++ python/pyspark/sql/tests.py | 7 ++++--- 2 files changed, 24 insertions(+), 3 deletions(-) (limited to 'python') diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 73105f881b..9208a527d2 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -497,6 +497,26 @@ class DataFrameWriter(object): self._jwrite = self._jwrite.mode(saveMode) return self + @since(2.0) + def outputMode(self, outputMode): + """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + + Options include: + + * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to + the sink + * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink + every time these is some updates + + .. note:: Experimental. + + >>> writer = sdf.write.outputMode('append') + """ + if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0: + raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode) + self._jwrite = self._jwrite.outputMode(outputMode) + return self + @since(1.4) def format(self, source): """Specifies the underlying output data source. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1790432edd..0d9dd5ea2a 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -926,7 +926,7 @@ class SQLTests(ReusedPySparkTestCase): out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') cq = df.write.option('checkpointLocation', chk).queryName('this_query') \ - .format('parquet').option('path', out).startStream() + .format('parquet').outputMode('append').option('path', out).startStream() try: self.assertEqual(cq.name, 'this_query') self.assertTrue(cq.isActive) @@ -952,8 +952,9 @@ class SQLTests(ReusedPySparkTestCase): fake1 = os.path.join(tmpPath, 'fake1') fake2 = os.path.join(tmpPath, 'fake2') cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \ - .queryName('fake_query').startStream(path=out, format='parquet', queryName='this_query', - checkpointLocation=chk) + .queryName('fake_query').outputMode('append') \ + .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) + try: self.assertEqual(cq.name, 'this_query') self.assertTrue(cq.isActive) -- cgit v1.2.3