From cf95d728c64f76e8b1065d7cacf1c3ad7769e935 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Mar 2016 10:30:55 -0800 Subject: [SPARK-13543][SQL] Support for specifying compression codec for Parquet/ORC via option() ## What changes were proposed in this pull request? This PR adds the support to specify compression codecs for both ORC and Parquet. ## How was this patch tested? unittests within IDE and code style tests with `dev/run_tests`. Author: hyukjinkwon Closes #11464 from HyukjinKwon/SPARK-13543. --- python/pyspark/sql/readwriter.py | 55 +++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 21 deletions(-) (limited to 'python/pyspark/sql') diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 7f5368d8bd..438662bb15 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -454,7 +454,7 @@ class DataFrameWriter(object): self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode=None): + def json(self, path, mode=None, compression=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -464,18 +464,19 @@ class DataFrameWriter(object): * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. - - You can set the following JSON-specific option(s) for writing JSON files: - * ``compression`` (default ``None``): compression codec to use when saving to file. - This can be one of the known case-insensitive shorten names - (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (none, bzip2, gzip, lz4, + snappy and deflate). >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ - self.mode(mode)._jwrite.json(path) + self.mode(mode) + if compression is not None: + self.option("compression", compression) + self._jwrite.json(path) @since(1.4) - def parquet(self, path, mode=None, partitionBy=None): + def parquet(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in Parquet format at the specified path. :param path: the path in any Hadoop supported file system @@ -486,32 +487,37 @@ class DataFrameWriter(object): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (none, snappy, gzip, and lzo). + This will overwrite ``spark.sql.parquet.compression.codec``. >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) if partitionBy is not None: self.partitionBy(partitionBy) + if compression is not None: + self.option("compression", compression) self._jwrite.parquet(path) @since(1.6) - def text(self, path): + def text(self, path, compression=None): """Saves the content of the DataFrame in a text file at the specified path. :param path: the path in any Hadoop supported file system + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (none, bzip2, gzip, lz4, + snappy and deflate). The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. - - You can set the following option(s) for writing text files: - * ``compression`` (default ``None``): compression codec to use when saving to file. - This can be one of the known case-insensitive shorten names - (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). """ + if compression is not None: + self.option("compression", compression) self._jwrite.text(path) @since(2.0) - def csv(self, path, mode=None): + def csv(self, path, mode=None, compression=None): """Saves the content of the [[DataFrame]] in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -522,17 +528,19 @@ class DataFrameWriter(object): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. - You can set the following CSV-specific option(s) for writing CSV files: - * ``compression`` (default ``None``): compression codec to use when saving to file. - This can be one of the known case-insensitive shorten names - (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (none, bzip2, gzip, lz4, + snappy and deflate). >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ - self.mode(mode)._jwrite.csv(path) + self.mode(mode) + if compression is not None: + self.option("compression", compression) + self._jwrite.csv(path) @since(1.5) - def orc(self, path, mode=None, partitionBy=None): + def orc(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in ORC format at the specified path. ::Note: Currently ORC support is only available together with @@ -546,6 +554,9 @@ class DataFrameWriter(object): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (none, snappy, zlib, and lzo). + This will overwrite ``orc.compress``. >>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) @@ -553,6 +564,8 @@ class DataFrameWriter(object): self.mode(mode) if partitionBy is not None: self.partitionBy(partitionBy) + if compression is not None: + self.option("compression", compression) self._jwrite.orc(path) @since(1.4) -- cgit v1.2.3