aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-05-03 10:58:26 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-05-03 10:58:26 -0700
commit4ad492c40358d0104db508db98ce0971114b6817 (patch)
treecf4b3849623e2f2e6c002178dd7963997b41e81a /python/pyspark/sql/tests.py
parent5bd9a2f697dac44a4777e24321a2eb4a3d54e24b (diff)
downloadspark-4ad492c40358d0104db508db98ce0971114b6817.tar.gz
spark-4ad492c40358d0104db508db98ce0971114b6817.tar.bz2
spark-4ad492c40358d0104db508db98ce0971114b6817.zip
[SPARK-14716][SQL] Added support for partitioning in FileStreamSink
# What changes were proposed in this pull request? Support partitioning in the file stream sink. This is implemented using a new, but simpler code path for writing parquet files - both unpartitioned and partitioned. This new code path does not use Output Committers, as we will eventually write the file names to the metadata log for "committing" them. This patch duplicates < 100 LOC from the WriterContainer. But its far simpler that WriterContainer as it does not involve output committing. In addition, it introduces the new APIs in FileFormat and OutputWriterFactory in an attempt to simplify the APIs (not have Job in the `FileFormat` API, not have bucket and other stuff in the `OutputWriterFactory.newInstance()` ). # Tests - New unit tests to test the FileStreamSinkWriter for partitioned and unpartitioned files - New unit test to partially test the FileStreamSink for partitioned files (does not test recovery of partition column data, as that requires change in the StreamFileCatalog, future PR). - Updated FileStressSuite to test number of records read from partitioned output files. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #12409 from tdas/streaming-partitioned-parquet.
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py4
1 files changed, 2 insertions, 2 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 4995b263e1..cd5c4a7b3e 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -940,7 +940,7 @@ class SQLTests(ReusedPySparkTestCase):
cq.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
- output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
+ output_files.extend([f for f in files if not f.startswith('.')])
self.assertTrue(len(output_files) > 0)
self.assertTrue(len(os.listdir(chk)) > 0)
finally:
@@ -967,7 +967,7 @@ class SQLTests(ReusedPySparkTestCase):
cq.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
- output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
+ output_files.extend([f for f in files if not f.startswith('.')])
self.assertTrue(len(output_files) > 0)
self.assertTrue(len(os.listdir(chk)) > 0)
self.assertFalse(os.path.isdir(fake1)) # should not have been created