diff options
author | Vladimir Vladimirov <vladimir.vladimirov@magnetic.com> | 2015-02-06 13:55:02 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-02-06 13:55:22 -0800 |
commit | 1d3234165b9e0c4017fe26d9339a1ab49890c868 (patch) | |
tree | 8156e6b31c4a477ba8e35c948ed63db25fcfd585 /python/pyspark | |
parent | 87e0f0dc66bc33d9bab5a91faaa36e65811bf9e2 (diff) | |
download | spark-1d3234165b9e0c4017fe26d9339a1ab49890c868.tar.gz spark-1d3234165b9e0c4017fe26d9339a1ab49890c868.tar.bz2 spark-1d3234165b9e0c4017fe26d9339a1ab49890c868.zip |
SPARK-5633 pyspark saveAsTextFile support for compression codec
See https://issues.apache.org/jira/browse/SPARK-5633 for details
Author: Vladimir Vladimirov <vladimir.vladimirov@magnetic.com>
Closes #4403 from smartkiwi/master and squashes the following commits:
94c014e [Vladimir Vladimirov] SPARK-5633 pyspark saveAsTextFile support for compression codec
(cherry picked from commit b3872e00d155939e40366debda635fc3fb12cc73)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/rdd.py | 22 |
1 files changed, 20 insertions, 2 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6e029bf7f1..bd4f16e058 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1366,10 +1366,14 @@ class RDD(object): ser = BatchedSerializer(PickleSerializer(), batchSize) self._reserialize(ser)._jrdd.saveAsObjectFile(path) - def saveAsTextFile(self, path): + def saveAsTextFile(self, path, compressionCodecClass=None): """ Save this RDD as a text file, using string representations of elements. + @param path: path to text file + @param compressionCodecClass: (None by default) string i.e. + "org.apache.hadoop.io.compress.GzipCodec" + >>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) @@ -1385,6 +1389,16 @@ class RDD(object): >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) '\\n\\n\\nbar\\nfoo\\n' + + Using compressionCodecClass + + >>> tempFile3 = NamedTemporaryFile(delete=True) + >>> tempFile3.close() + >>> codec = "org.apache.hadoop.io.compress.GzipCodec" + >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec) + >>> from fileinput import input, hook_compressed + >>> ''.join(sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))) + 'bar\\nfoo\\n' """ def func(split, iterator): for x in iterator: @@ -1395,7 +1409,11 @@ class RDD(object): yield x keyed = self.mapPartitionsWithIndex(func) keyed._bypass_serializer = True - keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) + if compressionCodecClass: + compressionCodec = self.ctx._jvm.java.lang.Class.forName(compressionCodecClass) + keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec) + else: + keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) # Pair functions |