diff options
Diffstat (limited to 'pyspark/pyspark/serializers.py')
-rw-r--r-- | pyspark/pyspark/serializers.py | 233 |
1 files changed, 20 insertions, 213 deletions
diff --git a/pyspark/pyspark/serializers.py b/pyspark/pyspark/serializers.py index b113f5656b..7b3e6966e1 100644 --- a/pyspark/pyspark/serializers.py +++ b/pyspark/pyspark/serializers.py @@ -2,228 +2,35 @@ Data serialization methods. The Spark Python API is built on top of the Spark Java API. RDDs created in -Python are stored in Java as RDDs of Strings. Python objects are automatically -serialized/deserialized, so this representation is transparent to the end-user. - ------------------- -Serializer objects ------------------- - -`Serializer` objects are used to customize how an RDD's values are serialized. - -Each `Serializer` is a named tuple with four fields: - - - A `dumps` function, for serializing a Python object to a string. - - - A `loads` function, for deserializing a Python object from a string. - - - An `is_comparable` field, True if equal Python objects are serialized to - equal strings, and False otherwise. - - - A `name` field, used to identify the Serializer. Serializers are - compared for equality by comparing their names. - -The serializer's output should be base64-encoded. - ------------------------------------------------------------------- -`is_comparable`: comparing serialized representations for equality ------------------------------------------------------------------- - -If `is_comparable` is False, the serializer's representations of equal objects -are not required to be equal: - ->>> import pickle ->>> a = {1: 0, 9: 0} ->>> b = {9: 0, 1: 0} ->>> a == b -True ->>> pickle.dumps(a) == pickle.dumps(b) -False - -RDDs with comparable serializers can use native Java implementations of -operations like join() and distinct(), which may lead to better performance by -eliminating deserialization and Python comparisons. - -The default JSONSerializer produces comparable representations of common Python -data structures. - --------------------------------------- -Examples of serialized representations --------------------------------------- - -The RDD transformations that use Python UDFs are implemented in terms of -a modified `PipedRDD.pipe()` function. For each record `x` in the RDD, the -`pipe()` function pipes `x.toString()` to a Python worker process, which -deserializes the string into a Python object, executes user-defined functions, -and outputs serialized Python objects. - -The regular `toString()` method returns an ambiguous representation, due to the -way that Scala `Option` instances are printed: - ->>> from context import SparkContext ->>> sc = SparkContext("local", "SerializerDocs") ->>> x = sc.parallelizePairs([("a", 1), ("b", 4)]) ->>> y = sc.parallelizePairs([("a", 2)]) - ->>> print y.rightOuterJoin(x)._jrdd.first().toString() -(ImEi,(Some(Mg==),MQ==)) - -In Java, preprocessing is performed to handle Option instances, so the Python -process receives unambiguous input: - ->>> print sc.python_dump(y.rightOuterJoin(x)._jrdd.first()) -(ImEi,(Mg==,MQ==)) - -The base64-encoding eliminates the need to escape newlines, parentheses and -other special characters. - ----------------------- -Serializer composition ----------------------- - -In order to handle nested structures, which could contain object serialized -with different serializers, the RDD module composes serializers. For example, -the serializers in the previous example are: - ->>> print x.serializer.name -PairSerializer<JSONSerializer, JSONSerializer> - ->>> print y.serializer.name -PairSerializer<JSONSerializer, JSONSerializer> - ->>> print y.rightOuterJoin(x).serializer.name -PairSerializer<JSONSerializer, PairSerializer<OptionSerializer<JSONSerializer>, JSONSerializer>> +Python are stored in Java as RDD[Array[Byte]]. Python objects are +automatically serialized/deserialized, so this representation is transparent to +the end-user. """ -from base64 import standard_b64encode, standard_b64decode from collections import namedtuple import cPickle -import simplejson - - -Serializer = namedtuple("Serializer", - ["dumps","loads", "is_comparable", "name"]) - - -NopSerializer = Serializer(str, str, True, "NopSerializer") +import struct -JSONSerializer = Serializer( - lambda obj: standard_b64encode(simplejson.dumps(obj, sort_keys=True, - separators=(',', ':'))), - lambda s: simplejson.loads(standard_b64decode(s)), - True, - "JSONSerializer" -) +Serializer = namedtuple("Serializer", ["dumps","loads"]) PickleSerializer = Serializer( - lambda obj: standard_b64encode(cPickle.dumps(obj)), - lambda s: cPickle.loads(standard_b64decode(s)), - False, - "PickleSerializer" -) - - -def OptionSerializer(serializer): - """ - >>> ser = OptionSerializer(NopSerializer) - >>> ser.loads(ser.dumps("Hello, World!")) - 'Hello, World!' - >>> ser.loads(ser.dumps(None)) is None - True - """ - none_placeholder = '*' - - def dumps(x): - if x is None: - return none_placeholder - else: - return serializer.dumps(x) - - def loads(x): - if x == none_placeholder: - return None - else: - return serializer.loads(x) - - name = "OptionSerializer<%s>" % serializer.name - return Serializer(dumps, loads, serializer.is_comparable, name) - - -def PairSerializer(keySerializer, valSerializer): - """ - Returns a Serializer for a (key, value) pair. - - >>> ser = PairSerializer(JSONSerializer, JSONSerializer) - >>> ser.loads(ser.dumps((1, 2))) - (1, 2) - - >>> ser = PairSerializer(JSONSerializer, ser) - >>> ser.loads(ser.dumps((1, (2, 3)))) - (1, (2, 3)) - """ - def loads(kv): - try: - (key, val) = kv[1:-1].split(',', 1) - key = keySerializer.loads(key) - val = valSerializer.loads(val) - return (key, val) - except: - print "Error in deserializing pair from '%s'" % str(kv) - raise - - def dumps(kv): - (key, val) = kv - return"(%s,%s)" % (keySerializer.dumps(key), valSerializer.dumps(val)) - is_comparable = \ - keySerializer.is_comparable and valSerializer.is_comparable - name = "PairSerializer<%s, %s>" % (keySerializer.name, valSerializer.name) - return Serializer(dumps, loads, is_comparable, name) - - -def ArraySerializer(serializer): - """ - >>> ser = ArraySerializer(JSONSerializer) - >>> ser.loads(ser.dumps([1, 2, 3, 4])) - [1, 2, 3, 4] - >>> ser = ArraySerializer(PairSerializer(JSONSerializer, PickleSerializer)) - >>> ser.loads(ser.dumps([('a', 1), ('b', 2)])) - [('a', 1), ('b', 2)] - >>> ser.loads(ser.dumps([('a', 1)])) - [('a', 1)] - >>> ser.loads(ser.dumps([])) - [] - """ - def dumps(arr): - if arr == []: - return '[]' - else: - return '[' + '|'.join(serializer.dumps(x) for x in arr) + ']' - - def loads(s): - if s == '[]': - return [] - items = s[1:-1] - if '|' in items: - items = items.split('|') - else: - items = [items] - return [serializer.loads(x) for x in items] - - name = "ArraySerializer<%s>" % serializer.name - return Serializer(dumps, loads, serializer.is_comparable, name) - - -# TODO: IntegerSerializer - - -# TODO: DoubleSerializer + lambda obj: cPickle.dumps(obj, -1), + cPickle.loads) -def _test(): - import doctest - doctest.testmod() +def dumps(obj, stream): + # TODO: determining the length of non-byte objects. + stream.write(struct.pack("!i", len(obj))) + stream.write(obj) -if __name__ == "__main__": - _test() +def loads(stream): + length = stream.read(4) + if length == "": + raise EOFError + length = struct.unpack("!i", length)[0] + obj = stream.read(length) + if obj == "": + raise EOFError + return obj |