aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py27
1 files changed, 26 insertions, 1 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 69d543d9d0..51bfbb47e5 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -39,7 +39,7 @@ else:
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
-from pyspark.serializers import read_int
+from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger
_have_scipy = False
@@ -339,6 +339,31 @@ class TestRDDFunctions(PySparkTestCase):
m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum()
self.assertEquals(N, m)
+ def test_zip_with_different_serializers(self):
+ a = self.sc.parallelize(range(5))
+ b = self.sc.parallelize(range(100, 105))
+ self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)])
+ a = a._reserialize(BatchedSerializer(PickleSerializer(), 2))
+ b = b._reserialize(MarshalSerializer())
+ self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)])
+
+ def test_zip_with_different_number_of_items(self):
+ a = self.sc.parallelize(range(5), 2)
+ # different number of partitions
+ b = self.sc.parallelize(range(100, 106), 3)
+ self.assertRaises(ValueError, lambda: a.zip(b))
+ # different number of batched items in JVM
+ b = self.sc.parallelize(range(100, 104), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # different number of items in one pair
+ b = self.sc.parallelize(range(100, 106), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # same total number of items, but different distributions
+ a = self.sc.parallelize([2, 3], 2).flatMap(range)
+ b = self.sc.parallelize([3, 2], 2).flatMap(range)
+ self.assertEquals(a.count(), b.count())
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+
class TestIO(PySparkTestCase):