aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py25
1 files changed, 11 insertions, 14 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 57754776fa..bd2ff00c0f 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -469,8 +469,7 @@ class RDD(object):
def _reserialize(self, serializer=None):
serializer = serializer or self.ctx.serializer
if self._jrdd_deserializer != serializer:
- if not isinstance(self, PipelinedRDD):
- self = self.map(lambda x: x, preservesPartitioning=True)
+ self = self.map(lambda x: x, preservesPartitioning=True)
self._jrdd_deserializer = serializer
return self
@@ -1798,23 +1797,21 @@ class RDD(object):
def get_batch_size(ser):
if isinstance(ser, BatchedSerializer):
return ser.batchSize
- return 1
+ return 1 # not batched
def batch_as(rdd, batchSize):
- ser = rdd._jrdd_deserializer
- if isinstance(ser, BatchedSerializer):
- ser = ser.serializer
- return rdd._reserialize(BatchedSerializer(ser, batchSize))
+ return rdd._reserialize(BatchedSerializer(PickleSerializer(), batchSize))
my_batch = get_batch_size(self._jrdd_deserializer)
other_batch = get_batch_size(other._jrdd_deserializer)
- # use the smallest batchSize for both of them
- batchSize = min(my_batch, other_batch)
- if batchSize <= 0:
- # auto batched or unlimited
- batchSize = 100
- other = batch_as(other, batchSize)
- self = batch_as(self, batchSize)
+ if my_batch != other_batch:
+ # use the smallest batchSize for both of them
+ batchSize = min(my_batch, other_batch)
+ if batchSize <= 0:
+ # auto batched or unlimited
+ batchSize = 100
+ other = batch_as(other, batchSize)
+ self = batch_as(self, batchSize)
if self.getNumPartitions() != other.getNumPartitions():
raise ValueError("Can only zip with RDD which has the same number of partitions")