aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-05-31 15:57:01 -0700
committerMichael Armbrust <michael@databricks.com>2016-05-31 15:57:01 -0700
commit90b11439b3d4540f48985e87dcc99749f0369287 (patch)
treedeab5a578c9fa2044764c2e8c0b34d1a6bfdbef9 /python/pyspark/sql/tests.py
parentdfe2cbeb437a4fa69bec3eca4ac9242f3eb51c81 (diff)
downloadspark-90b11439b3d4540f48985e87dcc99749f0369287.tar.gz
spark-90b11439b3d4540f48985e87dcc99749f0369287.tar.bz2
spark-90b11439b3d4540f48985e87dcc99749f0369287.zip
[SPARK-15517][SQL][STREAMING] Add support for complete output mode in Structure Streaming
## What changes were proposed in this pull request? Currently structured streaming only supports append output mode. This PR adds the following. - Added support for Complete output mode in the internal state store, analyzer and planner. - Added public API in Scala and Python for users to specify output mode - Added checks for unsupported combinations of output mode and DF operations - Plans with no aggregation should support only Append mode - Plans with aggregation should support only Update and Complete modes - Default output mode is Append mode (**Question: should we change this to automatically set to Complete mode when there is aggregation?**) - Added support for Complete output mode in Memory Sink. So Memory Sink internally supports append and complete, update. But from public API only Complete and Append output modes are supported. ## How was this patch tested? Unit tests in various test suites - StreamingAggregationSuite: tests for complete mode - MemorySinkSuite: tests for checking behavior in Append and Complete modes. - UnsupportedOperationSuite: tests for checking unsupported combinations of DF ops and output modes - DataFrameReaderWriterSuite: tests for checking that output mode cannot be called on static DFs - Python doc test and existing unit tests modified to call write.outputMode. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13286 from tdas/complete-mode.
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py7
1 files changed, 4 insertions, 3 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1790432edd..0d9dd5ea2a 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -926,7 +926,7 @@ class SQLTests(ReusedPySparkTestCase):
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
cq = df.write.option('checkpointLocation', chk).queryName('this_query') \
- .format('parquet').option('path', out).startStream()
+ .format('parquet').outputMode('append').option('path', out).startStream()
try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)
@@ -952,8 +952,9 @@ class SQLTests(ReusedPySparkTestCase):
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \
- .queryName('fake_query').startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ .queryName('fake_query').outputMode('append') \
+ .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
+
try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)