diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-08-19 14:46:32 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-08-19 14:46:32 -0700 |
commit | d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb (patch) | |
tree | bff49107d4452ea55eb7df8b1d44bae2ee0b9baa /python/pyspark/tests.py | |
parent | 76eaeb4523ee01cabbea2d867daac48a277885a1 (diff) | |
download | spark-d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb.tar.gz spark-d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb.tar.bz2 spark-d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb.zip |
[SPARK-2790] [PySpark] fix zip with serializers which have different batch sizes.
If two RDDs have different batch size in serializers, then it will try to re-serialize the one with smaller batch size, then call RDD.zip() in Spark.
Author: Davies Liu <davies.liu@gmail.com>
Closes #1894 from davies/zip and squashes the following commits:
c4652ea [Davies Liu] add more test cases
6d05fc8 [Davies Liu] Merge branch 'master' into zip
813b1e4 [Davies Liu] add more tests for failed cases
a4aafda [Davies Liu] fix zip with serializers which have different batch sizes.
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r-- | python/pyspark/tests.py | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 69d543d9d0..51bfbb47e5 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -39,7 +39,7 @@ else: from pyspark.context import SparkContext from pyspark.files import SparkFiles -from pyspark.serializers import read_int +from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger _have_scipy = False @@ -339,6 +339,31 @@ class TestRDDFunctions(PySparkTestCase): m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum() self.assertEquals(N, m) + def test_zip_with_different_serializers(self): + a = self.sc.parallelize(range(5)) + b = self.sc.parallelize(range(100, 105)) + self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)]) + a = a._reserialize(BatchedSerializer(PickleSerializer(), 2)) + b = b._reserialize(MarshalSerializer()) + self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)]) + + def test_zip_with_different_number_of_items(self): + a = self.sc.parallelize(range(5), 2) + # different number of partitions + b = self.sc.parallelize(range(100, 106), 3) + self.assertRaises(ValueError, lambda: a.zip(b)) + # different number of batched items in JVM + b = self.sc.parallelize(range(100, 104), 2) + self.assertRaises(Exception, lambda: a.zip(b).count()) + # different number of items in one pair + b = self.sc.parallelize(range(100, 106), 2) + self.assertRaises(Exception, lambda: a.zip(b).count()) + # same total number of items, but different distributions + a = self.sc.parallelize([2, 3], 2).flatMap(range) + b = self.sc.parallelize([3, 2], 2).flatMap(range) + self.assertEquals(a.count(), b.count()) + self.assertRaises(Exception, lambda: a.zip(b).count()) + class TestIO(PySparkTestCase): |