From 7387126f83dc0489eb1df734bfeba705709b7861 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 15 Mar 2017 10:17:18 -0700 Subject: [SPARK-19872] [PYTHON] Use the correct deserializer for RDD construction for coalesce/repartition ## What changes were proposed in this pull request? This PR proposes to use the correct deserializer, `BatchedSerializer` for RDD construction for coalesce/repartition when the shuffle is enabled. Currently, it is passing `UTF8Deserializer` as is not `BatchedSerializer` from the copied one. with the file, `text.txt` below: ``` a b d e f g h i j k l ``` - Before ```python >>> sc.textFile('text.txt').repartition(1).collect() ``` ``` UTF8Deserializer(True) Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/rdd.py", line 811, in collect return list(_load_from_socket(port, self._jrdd_deserializer)) File ".../spark/python/pyspark/serializers.py", line 549, in load_stream yield self.loads(stream) File ".../spark/python/pyspark/serializers.py", line 544, in loads return s.decode("utf-8") if self.use_unicode else s File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode return codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte ``` - After ```python >>> sc.textFile('text.txt').repartition(1).collect() ``` ``` [u'a', u'b', u'', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l', u''] ``` ## How was this patch tested? Unit test in `python/pyspark/tests.py`. Author: hyukjinkwon Closes #17282 from HyukjinKwon/SPARK-19872. --- python/pyspark/rdd.py | 4 +++- python/pyspark/tests.py | 6 ++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a5e6e2b054..291c1caaae 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2072,10 +2072,12 @@ class RDD(object): batchSize = min(10, self.ctx._batchSize or 1024) ser = BatchedSerializer(PickleSerializer(), batchSize) selfCopy = self._reserialize(ser) + jrdd_deserializer = selfCopy._jrdd_deserializer jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle) else: + jrdd_deserializer = self._jrdd_deserializer jrdd = self._jrdd.coalesce(numPartitions, shuffle) - return RDD(jrdd, self.ctx, self._jrdd_deserializer) + return RDD(jrdd, self.ctx, jrdd_deserializer) def zip(self, other): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index c6c87a9ea5..bb13de563c 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1037,6 +1037,12 @@ class RDDTests(ReusedPySparkTestCase): zeros = len([x for x in l if x == 0]) self.assertTrue(zeros == 0) + def test_repartition_on_textfile(self): + path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt") + rdd = self.sc.textFile(path) + result = rdd.repartition(1).collect() + self.assertEqual(u"Hello World!", result[0]) + def test_distinct(self): rdd = self.sc.parallelize((1, 2, 3)*10, 10) self.assertEqual(rdd.getNumPartitions(), 10) -- cgit v1.2.3