aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/serializers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/serializers.py')
-rw-r--r--python/pyspark/serializers.py36
1 files changed, 13 insertions, 23 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 904bd9f265..d597cbf94e 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -33,9 +33,8 @@ The serializer is chosen when creating L{SparkContext}:
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> sc.stop()
-By default, PySpark serialize objects in batches; the batch size can be
-controlled through SparkContext's C{batchSize} parameter
-(the default size is 1024 objects):
+PySpark serialize objects in batches; By default, the batch size is chosen based
+on the size of objects, also configurable by SparkContext's C{batchSize} parameter:
>>> sc = SparkContext('local', 'test', batchSize=2)
>>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
@@ -48,16 +47,6 @@ which contains two batches of two objects:
>>> rdd._jrdd.count()
8L
>>> sc.stop()
-
-A batch size of -1 uses an unlimited batch size, and a size of 1 disables
-batching:
-
->>> sc = SparkContext('local', 'test', batchSize=1)
->>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
->>> rdd.glom().collect()
-[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
->>> rdd._jrdd.count()
-16L
"""
import cPickle
@@ -73,7 +62,7 @@ import itertools
from pyspark import cloudpickle
-__all__ = ["PickleSerializer", "MarshalSerializer"]
+__all__ = ["PickleSerializer", "MarshalSerializer", "UTF8Deserializer"]
class SpecialLengths(object):
@@ -113,7 +102,7 @@ class Serializer(object):
return not self.__eq__(other)
def __repr__(self):
- return "<%s object>" % self.__class__.__name__
+ return "%s()" % self.__class__.__name__
def __hash__(self):
return hash(str(self))
@@ -181,6 +170,7 @@ class BatchedSerializer(Serializer):
"""
UNLIMITED_BATCH_SIZE = -1
+ UNKNOWN_BATCH_SIZE = 0
def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
self.serializer = serializer
@@ -213,10 +203,10 @@ class BatchedSerializer(Serializer):
def __eq__(self, other):
return (isinstance(other, BatchedSerializer) and
- other.serializer == self.serializer)
+ other.serializer == self.serializer and other.batchSize == self.batchSize)
def __repr__(self):
- return "BatchedSerializer<%s>" % str(self.serializer)
+ return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
class AutoBatchedSerializer(BatchedSerializer):
@@ -225,7 +215,7 @@ class AutoBatchedSerializer(BatchedSerializer):
"""
def __init__(self, serializer, bestSize=1 << 16):
- BatchedSerializer.__init__(self, serializer, -1)
+ BatchedSerializer.__init__(self, serializer, self.UNKNOWN_BATCH_SIZE)
self.bestSize = bestSize
def dump_stream(self, iterator, stream):
@@ -248,10 +238,10 @@ class AutoBatchedSerializer(BatchedSerializer):
def __eq__(self, other):
return (isinstance(other, AutoBatchedSerializer) and
- other.serializer == self.serializer)
+ other.serializer == self.serializer and other.bestSize == self.bestSize)
def __str__(self):
- return "AutoBatchedSerializer<%s>" % str(self.serializer)
+ return "AutoBatchedSerializer(%s)" % str(self.serializer)
class CartesianDeserializer(FramedSerializer):
@@ -284,7 +274,7 @@ class CartesianDeserializer(FramedSerializer):
self.key_ser == other.key_ser and self.val_ser == other.val_ser)
def __repr__(self):
- return "CartesianDeserializer<%s, %s>" % \
+ return "CartesianDeserializer(%s, %s)" % \
(str(self.key_ser), str(self.val_ser))
@@ -311,7 +301,7 @@ class PairDeserializer(CartesianDeserializer):
self.key_ser == other.key_ser and self.val_ser == other.val_ser)
def __repr__(self):
- return "PairDeserializer<%s, %s>" % (str(self.key_ser), str(self.val_ser))
+ return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser))
class NoOpSerializer(FramedSerializer):
@@ -430,7 +420,7 @@ class MarshalSerializer(FramedSerializer):
class AutoSerializer(FramedSerializer):
"""
- Choose marshal or cPickle as serialization protocol autumatically
+ Choose marshal or cPickle as serialization protocol automatically
"""
def __init__(self):