diff options
author | Davies Liu <davies@databricks.com> | 2015-02-24 14:50:00 -0800 |
---|---|---|
committer | Joseph K. Bradley <joseph@databricks.com> | 2015-02-24 14:50:28 -0800 |
commit | a9abcaa2c0147fdf10b37673ea31189d52d012cb (patch) | |
tree | 313bdb63b74759ee30da938a6328463ef14566c1 /python/pyspark | |
parent | 3ad00ee1c43e2b9f340f2511bd007fa2dd3eac8d (diff) | |
download | spark-a9abcaa2c0147fdf10b37673ea31189d52d012cb.tar.gz spark-a9abcaa2c0147fdf10b37673ea31189d52d012cb.tar.bz2 spark-a9abcaa2c0147fdf10b37673ea31189d52d012cb.zip |
[SPARK-5973] [PySpark] fix zip with two RDDs with AutoBatchedSerializer
Author: Davies Liu <davies@databricks.com>
Closes #4745 from davies/fix_zip and squashes the following commits:
2124b2c [Davies Liu] Update tests.py
b5c828f [Davies Liu] increase the number of records
c1e40fd [Davies Liu] fix zip with two RDDs with AutoBatchedSerializer
(cherry picked from commit da505e59274d1c838653c1109db65ad374e65304)
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/rdd.py | 2 | ||||
-rw-r--r-- | python/pyspark/tests.py | 6 |
2 files changed, 7 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f8b5f18253..9d676d77d2 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1815,7 +1815,7 @@ class RDD(object): my_batch = get_batch_size(self._jrdd_deserializer) other_batch = get_batch_size(other._jrdd_deserializer) - if my_batch != other_batch: + if my_batch != other_batch or not my_batch: # use the smallest batchSize for both of them batchSize = min(my_batch, other_batch) if batchSize <= 0: diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 1fc690a649..5007b6ebd7 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -543,6 +543,12 @@ class RDDTests(ReusedPySparkTestCase): # regression test for bug in _reserializer() self.assertEqual(cnt, t.zip(rdd).count()) + def test_zip_with_different_object_sizes(self): + # regress test for SPARK-5973 + a = self.sc.parallelize(range(10000)).map(lambda i: '*' * i) + b = self.sc.parallelize(range(10000, 20000)).map(lambda i: '*' * i) + self.assertEqual(10000, a.zip(b).count()) + def test_zip_with_different_number_of_items(self): a = self.sc.parallelize(range(5), 2) # different number of partitions |