aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/rdd.py2
-rw-r--r--python/pyspark/tests.py6
2 files changed, 7 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index ba2347ae76..d3148de6f4 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1950,7 +1950,7 @@ class RDD(object):
my_batch = get_batch_size(self._jrdd_deserializer)
other_batch = get_batch_size(other._jrdd_deserializer)
- if my_batch != other_batch:
+ if my_batch != other_batch or not my_batch:
# use the smallest batchSize for both of them
batchSize = min(my_batch, other_batch)
if batchSize <= 0:
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 52e82091c9..06ba2b461d 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -543,6 +543,12 @@ class RDDTests(ReusedPySparkTestCase):
# regression test for bug in _reserializer()
self.assertEqual(cnt, t.zip(rdd).count())
+ def test_zip_with_different_object_sizes(self):
+ # regress test for SPARK-5973
+ a = self.sc.parallelize(range(10000)).map(lambda i: '*' * i)
+ b = self.sc.parallelize(range(10000, 20000)).map(lambda i: '*' * i)
+ self.assertEqual(10000, a.zip(b).count())
+
def test_zip_with_different_number_of_items(self):
a = self.sc.parallelize(range(5), 2)
# different number of partitions