aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-03-03 10:30:55 -0800
committerReynold Xin <rxin@databricks.com>2016-03-03 10:30:55 -0800
commitcf95d728c64f76e8b1065d7cacf1c3ad7769e935 (patch)
tree8b473073775b2668e431a9cf53840b6fee85082f /python
parent511d4929c87ebf364b96bd46890628f736eaa026 (diff)
downloadspark-cf95d728c64f76e8b1065d7cacf1c3ad7769e935.tar.gz
spark-cf95d728c64f76e8b1065d7cacf1c3ad7769e935.tar.bz2
spark-cf95d728c64f76e8b1065d7cacf1c3ad7769e935.zip
[SPARK-13543][SQL] Support for specifying compression codec for Parquet/ORC via option()
## What changes were proposed in this pull request? This PR adds the support to specify compression codecs for both ORC and Parquet. ## How was this patch tested? unittests within IDE and code style tests with `dev/run_tests`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #11464 from HyukjinKwon/SPARK-13543.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/readwriter.py55
1 files changed, 34 insertions, 21 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 7f5368d8bd..438662bb15 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -454,7 +454,7 @@ class DataFrameWriter(object):
self._jwrite.saveAsTable(name)
@since(1.4)
- def json(self, path, mode=None):
+ def json(self, path, mode=None, compression=None):
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -464,18 +464,19 @@ class DataFrameWriter(object):
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
-
- You can set the following JSON-specific option(s) for writing JSON files:
- * ``compression`` (default ``None``): compression codec to use when saving to file.
- This can be one of the known case-insensitive shorten names
- (``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
+ :param compression: compression codec to use when saving to file. This can be one of the
+ known case-insensitive shorten names (none, bzip2, gzip, lz4,
+ snappy and deflate).
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
- self.mode(mode)._jwrite.json(path)
+ self.mode(mode)
+ if compression is not None:
+ self.option("compression", compression)
+ self._jwrite.json(path)
@since(1.4)
- def parquet(self, path, mode=None, partitionBy=None):
+ def parquet(self, path, mode=None, partitionBy=None, compression=None):
"""Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -486,32 +487,37 @@ class DataFrameWriter(object):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns
+ :param compression: compression codec to use when saving to file. This can be one of the
+ known case-insensitive shorten names (none, snappy, gzip, and lzo).
+ This will overwrite ``spark.sql.parquet.compression.codec``.
>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
if partitionBy is not None:
self.partitionBy(partitionBy)
+ if compression is not None:
+ self.option("compression", compression)
self._jwrite.parquet(path)
@since(1.6)
- def text(self, path):
+ def text(self, path, compression=None):
"""Saves the content of the DataFrame in a text file at the specified path.
:param path: the path in any Hadoop supported file system
+ :param compression: compression codec to use when saving to file. This can be one of the
+ known case-insensitive shorten names (none, bzip2, gzip, lz4,
+ snappy and deflate).
The DataFrame must have only one column that is of string type.
Each row becomes a new line in the output file.
-
- You can set the following option(s) for writing text files:
- * ``compression`` (default ``None``): compression codec to use when saving to file.
- This can be one of the known case-insensitive shorten names
- (``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
"""
+ if compression is not None:
+ self.option("compression", compression)
self._jwrite.text(path)
@since(2.0)
- def csv(self, path, mode=None):
+ def csv(self, path, mode=None, compression=None):
"""Saves the content of the [[DataFrame]] in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -522,17 +528,19 @@ class DataFrameWriter(object):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
- You can set the following CSV-specific option(s) for writing CSV files:
- * ``compression`` (default ``None``): compression codec to use when saving to file.
- This can be one of the known case-insensitive shorten names
- (``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
+ :param compression: compression codec to use when saving to file. This can be one of the
+ known case-insensitive shorten names (none, bzip2, gzip, lz4,
+ snappy and deflate).
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
- self.mode(mode)._jwrite.csv(path)
+ self.mode(mode)
+ if compression is not None:
+ self.option("compression", compression)
+ self._jwrite.csv(path)
@since(1.5)
- def orc(self, path, mode=None, partitionBy=None):
+ def orc(self, path, mode=None, partitionBy=None, compression=None):
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.
::Note: Currently ORC support is only available together with
@@ -546,6 +554,9 @@ class DataFrameWriter(object):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns
+ :param compression: compression codec to use when saving to file. This can be one of the
+ known case-insensitive shorten names (none, snappy, zlib, and lzo).
+ This will overwrite ``orc.compress``.
>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
@@ -553,6 +564,8 @@ class DataFrameWriter(object):
self.mode(mode)
if partitionBy is not None:
self.partitionBy(partitionBy)
+ if compression is not None:
+ self.option("compression", compression)
self._jwrite.orc(path)
@since(1.4)