aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-19 14:46:32 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-19 14:46:32 -0700
commitd7e80c2597d4a9cae2e0cb35a86f7889323f4cbb (patch)
treebff49107d4452ea55eb7df8b1d44bae2ee0b9baa /python/pyspark/tests.py
parent76eaeb4523ee01cabbea2d867daac48a277885a1 (diff)
downloadspark-d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb.tar.gz
spark-d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb.tar.bz2
spark-d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb.zip
[SPARK-2790] [PySpark] fix zip with serializers which have different batch sizes.
If two RDDs have different batch size in serializers, then it will try to re-serialize the one with smaller batch size, then call RDD.zip() in Spark. Author: Davies Liu <davies.liu@gmail.com> Closes #1894 from davies/zip and squashes the following commits: c4652ea [Davies Liu] add more test cases 6d05fc8 [Davies Liu] Merge branch 'master' into zip 813b1e4 [Davies Liu] add more tests for failed cases a4aafda [Davies Liu] fix zip with serializers which have different batch sizes.
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py27
1 files changed, 26 insertions, 1 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 69d543d9d0..51bfbb47e5 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -39,7 +39,7 @@ else:
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
-from pyspark.serializers import read_int
+from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger
_have_scipy = False
@@ -339,6 +339,31 @@ class TestRDDFunctions(PySparkTestCase):
m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum()
self.assertEquals(N, m)
+ def test_zip_with_different_serializers(self):
+ a = self.sc.parallelize(range(5))
+ b = self.sc.parallelize(range(100, 105))
+ self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)])
+ a = a._reserialize(BatchedSerializer(PickleSerializer(), 2))
+ b = b._reserialize(MarshalSerializer())
+ self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)])
+
+ def test_zip_with_different_number_of_items(self):
+ a = self.sc.parallelize(range(5), 2)
+ # different number of partitions
+ b = self.sc.parallelize(range(100, 106), 3)
+ self.assertRaises(ValueError, lambda: a.zip(b))
+ # different number of batched items in JVM
+ b = self.sc.parallelize(range(100, 104), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # different number of items in one pair
+ b = self.sc.parallelize(range(100, 106), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # same total number of items, but different distributions
+ a = self.sc.parallelize([2, 3], 2).flatMap(range)
+ b = self.sc.parallelize([3, 2], 2).flatMap(range)
+ self.assertEquals(a.count(), b.count())
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+
class TestIO(PySparkTestCase):