aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-03 23:56:14 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-03 23:56:14 -0800
commite4f42631a68b473ce706429915f3f08042af2119 (patch)
tree557ff754b9936addfb9628bfcba462802ff6ec1c /python/pyspark/rdd.py
parentb671ce047d036b8923007902826038b01e836e8a (diff)
downloadspark-e4f42631a68b473ce706429915f3f08042af2119.tar.gz
spark-e4f42631a68b473ce706429915f3f08042af2119.tar.bz2
spark-e4f42631a68b473ce706429915f3f08042af2119.zip
[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1. Author: Davies Liu <davies@databricks.com> This patch had conflicts when merged, resolved by Committer: Josh Rosen <joshrosen@databricks.com> Closes #2920 from davies/fix_autobatch and squashes the following commits: e544ef9 [Davies Liu] revert unrelated change 6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 1d557fc [Davies Liu] fix tests 8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 76abdce [Davies Liu] clean up 53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch b4292ce [Davies Liu] fix bug in master d79744c [Davies Liu] recover hive tests be37ece [Davies Liu] refactor eb3938d [Davies Liu] refactor serializer in scala 8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py91
1 files changed, 37 insertions, 54 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 4f025b9f11..879655dc53 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -120,7 +120,7 @@ class RDD(object):
operated on in parallel.
"""
- def __init__(self, jrdd, ctx, jrdd_deserializer):
+ def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())):
self._jrdd = jrdd
self.is_cached = False
self.is_checkpointed = False
@@ -129,12 +129,8 @@ class RDD(object):
self._id = jrdd.id()
self._partitionFunc = None
- def _toPickleSerialization(self):
- if (self._jrdd_deserializer == PickleSerializer() or
- self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
- return self
- else:
- return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+ def _pickled(self):
+ return self._reserialize(AutoBatchedSerializer(PickleSerializer()))
def id(self):
"""
@@ -446,12 +442,11 @@ class RDD(object):
def _reserialize(self, serializer=None):
serializer = serializer or self.ctx.serializer
- if self._jrdd_deserializer == serializer:
- return self
- else:
- converted = self.map(lambda x: x, preservesPartitioning=True)
- converted._jrdd_deserializer = serializer
- return converted
+ if self._jrdd_deserializer != serializer:
+ if not isinstance(self, PipelinedRDD):
+ self = self.map(lambda x: x, preservesPartitioning=True)
+ self._jrdd_deserializer = serializer
+ return self
def __add__(self, other):
"""
@@ -1120,9 +1115,8 @@ class RDD(object):
:param valueConverter: (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
keyConverter, valueConverter, True)
def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
@@ -1147,9 +1141,8 @@ class RDD(object):
:param conf: Hadoop job configuration, passed in as a dict (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, True, path,
outputFormatClass,
keyClass, valueClass,
keyConverter, valueConverter, jconf)
@@ -1166,9 +1159,8 @@ class RDD(object):
:param valueConverter: (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
keyConverter, valueConverter, False)
def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
@@ -1195,9 +1187,8 @@ class RDD(object):
:param compressionCodecClass: (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, True, path,
outputFormatClass,
keyClass, valueClass,
keyConverter, valueConverter,
@@ -1215,9 +1206,8 @@ class RDD(object):
:param path: path to sequence file
:param compressionCodecClass: (None by default)
"""
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, True,
path, compressionCodecClass)
def saveAsPickleFile(self, path, batchSize=10):
@@ -1232,8 +1222,11 @@ class RDD(object):
>>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
[1, 2, 'rdd', 'spark']
"""
- self._reserialize(BatchedSerializer(PickleSerializer(),
- batchSize))._jrdd.saveAsObjectFile(path)
+ if batchSize == 0:
+ ser = AutoBatchedSerializer(PickleSerializer())
+ else:
+ ser = BatchedSerializer(PickleSerializer(), batchSize)
+ self._reserialize(ser)._jrdd.saveAsObjectFile(path)
def saveAsTextFile(self, path):
"""
@@ -1774,13 +1767,10 @@ class RDD(object):
>>> x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
"""
- if self.getNumPartitions() != other.getNumPartitions():
- raise ValueError("Can only zip with RDD which has the same number of partitions")
-
def get_batch_size(ser):
if isinstance(ser, BatchedSerializer):
return ser.batchSize
- return 0
+ return 1
def batch_as(rdd, batchSize):
ser = rdd._jrdd_deserializer
@@ -1790,12 +1780,16 @@ class RDD(object):
my_batch = get_batch_size(self._jrdd_deserializer)
other_batch = get_batch_size(other._jrdd_deserializer)
- if my_batch != other_batch:
- # use the greatest batchSize to batch the other one.
- if my_batch > other_batch:
- other = batch_as(other, my_batch)
- else:
- self = batch_as(self, other_batch)
+ # use the smallest batchSize for both of them
+ batchSize = min(my_batch, other_batch)
+ if batchSize <= 0:
+ # auto batched or unlimited
+ batchSize = 100
+ other = batch_as(other, batchSize)
+ self = batch_as(self, batchSize)
+
+ if self.getNumPartitions() != other.getNumPartitions():
+ raise ValueError("Can only zip with RDD which has the same number of partitions")
# There will be an Exception in JVM if there are different number
# of items in each partitions.
@@ -1934,25 +1928,14 @@ class RDD(object):
return values.collect()
- def _is_pickled(self):
- """ Return this RDD is serialized by Pickle or not. """
- der = self._jrdd_deserializer
- if isinstance(der, PickleSerializer):
- return True
- if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
- return True
- return False
-
def _to_java_object_rdd(self):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
- rdd = self._reserialize(AutoBatchedSerializer(PickleSerializer())) \
- if not self._is_pickled() else self
- is_batch = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
- return self.ctx._jvm.PythonRDD.pythonToJava(rdd._jrdd, is_batch)
+ rdd = self._pickled()
+ return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
def countApprox(self, timeout, confidence=0.95):
"""
@@ -2132,7 +2115,7 @@ def _test():
globs = globals().copy()
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()