aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/serializers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/serializers.py')
-rw-r--r--python/pyspark/serializers.py178
1 files changed, 7 insertions, 171 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 760a509f0e..33aa55f7f1 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -448,184 +448,20 @@ class AutoSerializer(FramedSerializer):
raise ValueError("invalid sevialization type: %s" % _type)
-class SizeLimitedStream(object):
- """
- Read at most `limit` bytes from underlying stream
-
- >>> from StringIO import StringIO
- >>> io = StringIO()
- >>> io.write("Hello world")
- >>> io.seek(0)
- >>> lio = SizeLimitedStream(io, 5)
- >>> lio.read()
- 'Hello'
- """
- def __init__(self, stream, limit):
- self.stream = stream
- self.limit = limit
-
- def read(self, n=0):
- if n > self.limit or n == 0:
- n = self.limit
- buf = self.stream.read(n)
- self.limit -= len(buf)
- return buf
-
-
-class CompressedStream(object):
- """
- Compress the data using zlib
-
- >>> from StringIO import StringIO
- >>> io = StringIO()
- >>> wio = CompressedStream(io, 'w')
- >>> wio.write("Hello world")
- >>> wio.flush()
- >>> io.seek(0)
- >>> rio = CompressedStream(io, 'r')
- >>> rio.read()
- 'Hello world'
- >>> rio.read()
- ''
- """
- MAX_BATCH = 1 << 20 # 1MB
-
- def __init__(self, stream, mode='w', level=1):
- self.stream = stream
- self.mode = mode
- if mode == 'w':
- self.compresser = zlib.compressobj(level)
- elif mode == 'r':
- self.decompresser = zlib.decompressobj()
- self.buf = ''
- else:
- raise ValueError("can only support mode 'w' or 'r' ")
-
- def write(self, buf):
- assert self.mode == 'w', "It's not opened for write"
- if len(buf) > self.MAX_BATCH:
- # zlib can not compress string larger than 2G
- batches = len(buf) / self.MAX_BATCH + 1 # last one may be empty
- for i in xrange(batches):
- self.write(buf[i * self.MAX_BATCH:(i + 1) * self.MAX_BATCH])
- else:
- compressed = self.compresser.compress(buf)
- self.stream.write(compressed)
-
- def flush(self, mode=zlib.Z_FULL_FLUSH):
- if self.mode == 'w':
- d = self.compresser.flush(mode)
- self.stream.write(d)
- self.stream.flush()
-
- def close(self):
- if self.mode == 'w':
- self.flush(zlib.Z_FINISH)
- self.stream.close()
-
- def read(self, size=0):
- assert self.mode == 'r', "It's not opened for read"
- if not size:
- data = self.stream.read()
- result = self.decompresser.decompress(data)
- last = self.decompresser.flush()
- return self.buf + result + last
-
- # fast path for small read()
- if size <= len(self.buf):
- result = self.buf[:size]
- self.buf = self.buf[size:]
- return result
-
- result = [self.buf]
- size -= len(self.buf)
- self.buf = ''
- while size:
- need = min(size, self.MAX_BATCH)
- input = self.stream.read(need)
- if input:
- buf = self.decompresser.decompress(input)
- else:
- buf = self.decompresser.flush()
-
- if len(buf) >= size:
- self.buf = buf[size:]
- result.append(buf[:size])
- return ''.join(result)
-
- size -= len(buf)
- result.append(buf)
- if not input:
- return ''.join(result)
-
- def readline(self):
- """
- This is needed for pickle, but not used in protocol 2
- """
- line = []
- b = self.read(1)
- while b and b != '\n':
- line.append(b)
- b = self.read(1)
- line.append(b)
- return ''.join(line)
-
-
-class LargeObjectSerializer(Serializer):
- """
- Serialize large object which could be larger than 2G
-
- It uses cPickle to serialize the objects
- """
- def dump_stream(self, iterator, stream):
- stream = CompressedStream(stream, 'w')
- for value in iterator:
- if isinstance(value, basestring):
- if isinstance(value, unicode):
- stream.write('U')
- value = value.encode("utf-8")
- else:
- stream.write('S')
- write_long(len(value), stream)
- stream.write(value)
- else:
- stream.write('P')
- cPickle.dump(value, stream, 2)
- stream.flush()
-
- def load_stream(self, stream):
- stream = CompressedStream(stream, 'r')
- while True:
- type = stream.read(1)
- if not type:
- return
- if type in ('S', 'U'):
- length = read_long(stream)
- value = stream.read(length)
- if type == 'U':
- value = value.decode('utf-8')
- yield value
- elif type == 'P':
- yield cPickle.load(stream)
- else:
- raise ValueError("unknown type: %s" % type)
-
-
-class CompressedSerializer(Serializer):
+class CompressedSerializer(FramedSerializer):
"""
Compress the serialized data
"""
def __init__(self, serializer):
+ FramedSerializer.__init__(self)
+ assert isinstance(serializer, FramedSerializer), "serializer must be a FramedSerializer"
self.serializer = serializer
- def load_stream(self, stream):
- stream = CompressedStream(stream, "r")
- return self.serializer.load_stream(stream)
+ def dumps(self, obj):
+ return zlib.compress(self.serializer.dumps(obj), 1)
- def dump_stream(self, iterator, stream):
- stream = CompressedStream(stream, "w")
- self.serializer.dump_stream(iterator, stream)
- stream.flush()
+ def loads(self, obj):
+ return self.serializer.loads(zlib.decompress(obj))
class UTF8Deserializer(Serializer):