aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py10
1 files changed, 5 insertions, 5 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b43606b730..8ef233bc80 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -34,7 +34,7 @@ from math import sqrt, log, isinf, isnan
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
- PickleSerializer, pack_long, CompressedSerializer
+ PickleSerializer, pack_long, AutoBatchedSerializer
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
@@ -1927,10 +1927,10 @@ class RDD(object):
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
- if not self._is_pickled():
- self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
- batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
- return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+ rdd = self._reserialize(AutoBatchedSerializer(PickleSerializer())) \
+ if not self._is_pickled() else self
+ is_batch = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(rdd._jrdd, is_batch)
def countApprox(self, timeout, confidence=0.95):
"""