From 4ad492c40358d0104db508db98ce0971114b6817 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 3 May 2016 10:58:26 -0700 Subject: [SPARK-14716][SQL] Added support for partitioning in FileStreamSink # What changes were proposed in this pull request? Support partitioning in the file stream sink. This is implemented using a new, but simpler code path for writing parquet files - both unpartitioned and partitioned. This new code path does not use Output Committers, as we will eventually write the file names to the metadata log for "committing" them. This patch duplicates < 100 LOC from the WriterContainer. But its far simpler that WriterContainer as it does not involve output committing. In addition, it introduces the new APIs in FileFormat and OutputWriterFactory in an attempt to simplify the APIs (not have Job in the `FileFormat` API, not have bucket and other stuff in the `OutputWriterFactory.newInstance()` ). # Tests - New unit tests to test the FileStreamSinkWriter for partitioned and unpartitioned files - New unit test to partially test the FileStreamSink for partitioned files (does not test recovery of partition column data, as that requires change in the StreamFileCatalog, future PR). - Updated FileStressSuite to test number of records read from partitioned output files. Author: Tathagata Das Closes #12409 from tdas/streaming-partitioned-parquet. --- python/pyspark/sql/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'python') diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 4995b263e1..cd5c4a7b3e 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -940,7 +940,7 @@ class SQLTests(ReusedPySparkTestCase): cq.processAllAvailable() output_files = [] for _, _, files in os.walk(out): - output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')]) + output_files.extend([f for f in files if not f.startswith('.')]) self.assertTrue(len(output_files) > 0) self.assertTrue(len(os.listdir(chk)) > 0) finally: @@ -967,7 +967,7 @@ class SQLTests(ReusedPySparkTestCase): cq.processAllAvailable() output_files = [] for _, _, files in os.walk(out): - output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')]) + output_files.extend([f for f in files if not f.startswith('.')]) self.assertTrue(len(output_files) > 0) self.assertTrue(len(os.listdir(chk)) > 0) self.assertFalse(os.path.isdir(fake1)) # should not have been created -- cgit v1.2.3