diff options
author | Davies Liu <davies@databricks.com> | 2014-11-03 23:56:14 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-11-03 23:56:14 -0800 |
commit | e4f42631a68b473ce706429915f3f08042af2119 (patch) | |
tree | 557ff754b9936addfb9628bfcba462802ff6ec1c /python/pyspark/serializers.py | |
parent | b671ce047d036b8923007902826038b01e836e8a (diff) | |
download | spark-e4f42631a68b473ce706429915f3f08042af2119.tar.gz spark-e4f42631a68b473ce706429915f3f08042af2119.tar.bz2 spark-e4f42631a68b473ce706429915f3f08042af2119.zip |
[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1.
Author: Davies Liu <davies@databricks.com>
This patch had conflicts when merged, resolved by
Committer: Josh Rosen <joshrosen@databricks.com>
Closes #2920 from davies/fix_autobatch and squashes the following commits:
e544ef9 [Davies Liu] revert unrelated change
6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
1d557fc [Davies Liu] fix tests
8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
76abdce [Davies Liu] clean up
53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
b4292ce [Davies Liu] fix bug in master
d79744c [Davies Liu] recover hive tests
be37ece [Davies Liu] refactor
eb3938d [Davies Liu] refactor serializer in scala
8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
Diffstat (limited to 'python/pyspark/serializers.py')
-rw-r--r-- | python/pyspark/serializers.py | 36 |
1 files changed, 13 insertions, 23 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 904bd9f265..d597cbf94e 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -33,9 +33,8 @@ The serializer is chosen when creating L{SparkContext}: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] >>> sc.stop() -By default, PySpark serialize objects in batches; the batch size can be -controlled through SparkContext's C{batchSize} parameter -(the default size is 1024 objects): +PySpark serialize objects in batches; By default, the batch size is chosen based +on the size of objects, also configurable by SparkContext's C{batchSize} parameter: >>> sc = SparkContext('local', 'test', batchSize=2) >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x) @@ -48,16 +47,6 @@ which contains two batches of two objects: >>> rdd._jrdd.count() 8L >>> sc.stop() - -A batch size of -1 uses an unlimited batch size, and a size of 1 disables -batching: - ->>> sc = SparkContext('local', 'test', batchSize=1) ->>> rdd = sc.parallelize(range(16), 4).map(lambda x: x) ->>> rdd.glom().collect() -[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] ->>> rdd._jrdd.count() -16L """ import cPickle @@ -73,7 +62,7 @@ import itertools from pyspark import cloudpickle -__all__ = ["PickleSerializer", "MarshalSerializer"] +__all__ = ["PickleSerializer", "MarshalSerializer", "UTF8Deserializer"] class SpecialLengths(object): @@ -113,7 +102,7 @@ class Serializer(object): return not self.__eq__(other) def __repr__(self): - return "<%s object>" % self.__class__.__name__ + return "%s()" % self.__class__.__name__ def __hash__(self): return hash(str(self)) @@ -181,6 +170,7 @@ class BatchedSerializer(Serializer): """ UNLIMITED_BATCH_SIZE = -1 + UNKNOWN_BATCH_SIZE = 0 def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE): self.serializer = serializer @@ -213,10 +203,10 @@ class BatchedSerializer(Serializer): def __eq__(self, other): return (isinstance(other, BatchedSerializer) and - other.serializer == self.serializer) + other.serializer == self.serializer and other.batchSize == self.batchSize) def __repr__(self): - return "BatchedSerializer<%s>" % str(self.serializer) + return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize) class AutoBatchedSerializer(BatchedSerializer): @@ -225,7 +215,7 @@ class AutoBatchedSerializer(BatchedSerializer): """ def __init__(self, serializer, bestSize=1 << 16): - BatchedSerializer.__init__(self, serializer, -1) + BatchedSerializer.__init__(self, serializer, self.UNKNOWN_BATCH_SIZE) self.bestSize = bestSize def dump_stream(self, iterator, stream): @@ -248,10 +238,10 @@ class AutoBatchedSerializer(BatchedSerializer): def __eq__(self, other): return (isinstance(other, AutoBatchedSerializer) and - other.serializer == self.serializer) + other.serializer == self.serializer and other.bestSize == self.bestSize) def __str__(self): - return "AutoBatchedSerializer<%s>" % str(self.serializer) + return "AutoBatchedSerializer(%s)" % str(self.serializer) class CartesianDeserializer(FramedSerializer): @@ -284,7 +274,7 @@ class CartesianDeserializer(FramedSerializer): self.key_ser == other.key_ser and self.val_ser == other.val_ser) def __repr__(self): - return "CartesianDeserializer<%s, %s>" % \ + return "CartesianDeserializer(%s, %s)" % \ (str(self.key_ser), str(self.val_ser)) @@ -311,7 +301,7 @@ class PairDeserializer(CartesianDeserializer): self.key_ser == other.key_ser and self.val_ser == other.val_ser) def __repr__(self): - return "PairDeserializer<%s, %s>" % (str(self.key_ser), str(self.val_ser)) + return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser)) class NoOpSerializer(FramedSerializer): @@ -430,7 +420,7 @@ class MarshalSerializer(FramedSerializer): class AutoSerializer(FramedSerializer): """ - Choose marshal or cPickle as serialization protocol autumatically + Choose marshal or cPickle as serialization protocol automatically """ def __init__(self): |