From c246b95dd2f565043db429c38c6cc029a0b870c1 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 15 Dec 2014 22:58:26 -0800 Subject: [SPARK-4841] fix zip with textFile() UTF8Deserializer can not be used in BatchedSerializer, so always use PickleSerializer() when change batchSize in zip(). Also, if two RDD have the same batch size already, they did not need re-serialize any more. Author: Davies Liu Closes #3706 from davies/fix_4841 and squashes the following commits: 20ce3a3 [Davies Liu] fix bug in _reserialize() e3ebf7c [Davies Liu] add comment 379d2c8 [Davies Liu] fix zip with textFile() --- python/pyspark/tests.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'python/pyspark/tests.py') diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 32645778c2..bca52a7ce6 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -533,6 +533,15 @@ class RDDTests(ReusedPySparkTestCase): a = a._reserialize(BatchedSerializer(PickleSerializer(), 2)) b = b._reserialize(MarshalSerializer()) self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)]) + # regression test for SPARK-4841 + path = os.path.join(SPARK_HOME, "python/test_support/hello.txt") + t = self.sc.textFile(path) + cnt = t.count() + self.assertEqual(cnt, t.zip(t).count()) + rdd = t.map(str) + self.assertEqual(cnt, t.zip(rdd).count()) + # regression test for bug in _reserializer() + self.assertEqual(cnt, t.zip(rdd).count()) def test_zip_with_different_number_of_items(self): a = self.sc.parallelize(range(5), 2) -- cgit v1.2.3