diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2017-03-15 10:17:18 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2017-03-15 10:17:18 -0700 |
commit | 7387126f83dc0489eb1df734bfeba705709b7861 (patch) | |
tree | bd1944ee609393bc752669400d878e7f13eafaaf | |
parent | 9ff85be3bd6bf3a782c0e52fa9c2598d79f310bb (diff) | |
download | spark-7387126f83dc0489eb1df734bfeba705709b7861.tar.gz spark-7387126f83dc0489eb1df734bfeba705709b7861.tar.bz2 spark-7387126f83dc0489eb1df734bfeba705709b7861.zip |
[SPARK-19872] [PYTHON] Use the correct deserializer for RDD construction for coalesce/repartition
## What changes were proposed in this pull request?
This PR proposes to use the correct deserializer, `BatchedSerializer` for RDD construction for coalesce/repartition when the shuffle is enabled. Currently, it is passing `UTF8Deserializer` as is not `BatchedSerializer` from the copied one.
with the file, `text.txt` below:
```
a
b
d
e
f
g
h
i
j
k
l
```
- Before
```python
>>> sc.textFile('text.txt').repartition(1).collect()
```
```
UTF8Deserializer(True)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/rdd.py", line 811, in collect
return list(_load_from_socket(port, self._jrdd_deserializer))
File ".../spark/python/pyspark/serializers.py", line 549, in load_stream
yield self.loads(stream)
File ".../spark/python/pyspark/serializers.py", line 544, in loads
return s.decode("utf-8") if self.use_unicode else s
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte
```
- After
```python
>>> sc.textFile('text.txt').repartition(1).collect()
```
```
[u'a', u'b', u'', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l', u'']
```
## How was this patch tested?
Unit test in `python/pyspark/tests.py`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #17282 from HyukjinKwon/SPARK-19872.
-rw-r--r-- | python/pyspark/rdd.py | 4 | ||||
-rw-r--r-- | python/pyspark/tests.py | 6 |
2 files changed, 9 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a5e6e2b054..291c1caaae 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2072,10 +2072,12 @@ class RDD(object): batchSize = min(10, self.ctx._batchSize or 1024) ser = BatchedSerializer(PickleSerializer(), batchSize) selfCopy = self._reserialize(ser) + jrdd_deserializer = selfCopy._jrdd_deserializer jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle) else: + jrdd_deserializer = self._jrdd_deserializer jrdd = self._jrdd.coalesce(numPartitions, shuffle) - return RDD(jrdd, self.ctx, self._jrdd_deserializer) + return RDD(jrdd, self.ctx, jrdd_deserializer) def zip(self, other): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index c6c87a9ea5..bb13de563c 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1037,6 +1037,12 @@ class RDDTests(ReusedPySparkTestCase): zeros = len([x for x in l if x == 0]) self.assertTrue(zeros == 0) + def test_repartition_on_textfile(self): + path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt") + rdd = self.sc.textFile(path) + result = rdd.repartition(1).collect() + self.assertEqual(u"Hello World!", result[0]) + def test_distinct(self): rdd = self.sc.parallelize((1, 2, 3)*10, 10) self.assertEqual(rdd.getNumPartitions(), 10) |