aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-19 14:46:32 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-19 14:46:32 -0700
commitd7e80c2597d4a9cae2e0cb35a86f7889323f4cbb (patch)
treebff49107d4452ea55eb7df8b1d44bae2ee0b9baa /python/pyspark/rdd.py
parent76eaeb4523ee01cabbea2d867daac48a277885a1 (diff)
downloadspark-d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb.tar.gz
spark-d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb.tar.bz2
spark-d7e80c2597d4a9cae2e0cb35a86f7889323f4cbb.zip
[SPARK-2790] [PySpark] fix zip with serializers which have different batch sizes.
If two RDDs have different batch size in serializers, then it will try to re-serialize the one with smaller batch size, then call RDD.zip() in Spark. Author: Davies Liu <davies.liu@gmail.com> Closes #1894 from davies/zip and squashes the following commits: c4652ea [Davies Liu] add more test cases 6d05fc8 [Davies Liu] Merge branch 'master' into zip 813b1e4 [Davies Liu] add more tests for failed cases a4aafda [Davies Liu] fix zip with serializers which have different batch sizes.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py25
1 files changed, 25 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 86cd89b245..140cbe05a4 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1687,6 +1687,31 @@ class RDD(object):
>>> x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
"""
+ if self.getNumPartitions() != other.getNumPartitions():
+ raise ValueError("Can only zip with RDD which has the same number of partitions")
+
+ def get_batch_size(ser):
+ if isinstance(ser, BatchedSerializer):
+ return ser.batchSize
+ return 0
+
+ def batch_as(rdd, batchSize):
+ ser = rdd._jrdd_deserializer
+ if isinstance(ser, BatchedSerializer):
+ ser = ser.serializer
+ return rdd._reserialize(BatchedSerializer(ser, batchSize))
+
+ my_batch = get_batch_size(self._jrdd_deserializer)
+ other_batch = get_batch_size(other._jrdd_deserializer)
+ if my_batch != other_batch:
+ # use the greatest batchSize to batch the other one.
+ if my_batch > other_batch:
+ other = batch_as(other, my_batch)
+ else:
+ self = batch_as(self, other_batch)
+
+ # There will be an Exception in JVM if there are different number
+ # of items in each partitions.
pairRDD = self._jrdd.zip(other._jrdd)
deserializer = PairDeserializer(self._jrdd_deserializer,
other._jrdd_deserializer)