diff options
author | Davies Liu <davies@databricks.com> | 2014-12-15 22:58:26 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-15 22:58:26 -0800 |
commit | c246b95dd2f565043db429c38c6cc029a0b870c1 (patch) | |
tree | dfaabaae0428a63eff807510c918e932f0bb5df3 /python/pyspark/rdd.py | |
parent | c7628771da9a7d4bd4d7abfdede37ce1568dcd01 (diff) | |
download | spark-c246b95dd2f565043db429c38c6cc029a0b870c1.tar.gz spark-c246b95dd2f565043db429c38c6cc029a0b870c1.tar.bz2 spark-c246b95dd2f565043db429c38c6cc029a0b870c1.zip |
[SPARK-4841] fix zip with textFile()
UTF8Deserializer can not be used in BatchedSerializer, so always use PickleSerializer() when change batchSize in zip().
Also, if two RDD have the same batch size already, they did not need re-serialize any more.
Author: Davies Liu <davies@databricks.com>
Closes #3706 from davies/fix_4841 and squashes the following commits:
20ce3a3 [Davies Liu] fix bug in _reserialize()
e3ebf7c [Davies Liu] add comment
379d2c8 [Davies Liu] fix zip with textFile()
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 25 |
1 files changed, 11 insertions, 14 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 57754776fa..bd2ff00c0f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -469,8 +469,7 @@ class RDD(object): def _reserialize(self, serializer=None): serializer = serializer or self.ctx.serializer if self._jrdd_deserializer != serializer: - if not isinstance(self, PipelinedRDD): - self = self.map(lambda x: x, preservesPartitioning=True) + self = self.map(lambda x: x, preservesPartitioning=True) self._jrdd_deserializer = serializer return self @@ -1798,23 +1797,21 @@ class RDD(object): def get_batch_size(ser): if isinstance(ser, BatchedSerializer): return ser.batchSize - return 1 + return 1 # not batched def batch_as(rdd, batchSize): - ser = rdd._jrdd_deserializer - if isinstance(ser, BatchedSerializer): - ser = ser.serializer - return rdd._reserialize(BatchedSerializer(ser, batchSize)) + return rdd._reserialize(BatchedSerializer(PickleSerializer(), batchSize)) my_batch = get_batch_size(self._jrdd_deserializer) other_batch = get_batch_size(other._jrdd_deserializer) - # use the smallest batchSize for both of them - batchSize = min(my_batch, other_batch) - if batchSize <= 0: - # auto batched or unlimited - batchSize = 100 - other = batch_as(other, batchSize) - self = batch_as(self, batchSize) + if my_batch != other_batch: + # use the smallest batchSize for both of them + batchSize = min(my_batch, other_batch) + if batchSize <= 0: + # auto batched or unlimited + batchSize = 100 + other = batch_as(other, batchSize) + self = batch_as(self, batchSize) if self.getNumPartitions() != other.getNumPartitions(): raise ValueError("Can only zip with RDD which has the same number of partitions") |