aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/serializers.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-16 16:20:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-16 16:20:57 -0700
commit04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch)
treeb6429253955210445ddc37faa4d5166ea25a91e2 /python/pyspark/serializers.py
parent55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff)
downloadspark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'python/pyspark/serializers.py')
-rw-r--r--python/pyspark/serializers.py101
1 files changed, 47 insertions, 54 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 4afa82f4b2..d8cdcda3a3 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -49,16 +49,24 @@ which contains two batches of two objects:
>>> sc.stop()
"""
-import cPickle
-from itertools import chain, izip, product
+import sys
+from itertools import chain, product
import marshal
import struct
-import sys
import types
import collections
import zlib
import itertools
+if sys.version < '3':
+ import cPickle as pickle
+ protocol = 2
+ from itertools import izip as zip
+else:
+ import pickle
+ protocol = 3
+ xrange = range
+
from pyspark import cloudpickle
@@ -97,7 +105,7 @@ class Serializer(object):
# subclasses should override __eq__ as appropriate.
def __eq__(self, other):
- return isinstance(other, self.__class__)
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
@@ -212,10 +220,6 @@ class BatchedSerializer(Serializer):
def _load_stream_without_unbatching(self, stream):
return self.serializer.load_stream(stream)
- def __eq__(self, other):
- return (isinstance(other, BatchedSerializer) and
- other.serializer == self.serializer and other.batchSize == self.batchSize)
-
def __repr__(self):
return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
@@ -233,14 +237,14 @@ class FlattenedValuesSerializer(BatchedSerializer):
def _batched(self, iterator):
n = self.batchSize
for key, values in iterator:
- for i in xrange(0, len(values), n):
+ for i in range(0, len(values), n):
yield key, values[i:i + n]
def load_stream(self, stream):
return self.serializer.load_stream(stream)
def __repr__(self):
- return "FlattenedValuesSerializer(%d)" % self.batchSize
+ return "FlattenedValuesSerializer(%s, %d)" % (self.serializer, self.batchSize)
class AutoBatchedSerializer(BatchedSerializer):
@@ -270,12 +274,8 @@ class AutoBatchedSerializer(BatchedSerializer):
elif size > best * 10 and batch > 1:
batch /= 2
- def __eq__(self, other):
- return (isinstance(other, AutoBatchedSerializer) and
- other.serializer == self.serializer and other.bestSize == self.bestSize)
-
def __repr__(self):
- return "AutoBatchedSerializer(%s)" % str(self.serializer)
+ return "AutoBatchedSerializer(%s)" % self.serializer
class CartesianDeserializer(FramedSerializer):
@@ -285,6 +285,7 @@ class CartesianDeserializer(FramedSerializer):
"""
def __init__(self, key_ser, val_ser):
+ FramedSerializer.__init__(self)
self.key_ser = key_ser
self.val_ser = val_ser
@@ -293,7 +294,7 @@ class CartesianDeserializer(FramedSerializer):
val_stream = self.val_ser._load_stream_without_unbatching(stream)
key_is_batched = isinstance(self.key_ser, BatchedSerializer)
val_is_batched = isinstance(self.val_ser, BatchedSerializer)
- for (keys, vals) in izip(key_stream, val_stream):
+ for (keys, vals) in zip(key_stream, val_stream):
keys = keys if key_is_batched else [keys]
vals = vals if val_is_batched else [vals]
yield (keys, vals)
@@ -303,10 +304,6 @@ class CartesianDeserializer(FramedSerializer):
for pair in product(keys, vals):
yield pair
- def __eq__(self, other):
- return (isinstance(other, CartesianDeserializer) and
- self.key_ser == other.key_ser and self.val_ser == other.val_ser)
-
def __repr__(self):
return "CartesianDeserializer(%s, %s)" % \
(str(self.key_ser), str(self.val_ser))
@@ -318,22 +315,14 @@ class PairDeserializer(CartesianDeserializer):
Deserializes the JavaRDD zip() of two PythonRDDs.
"""
- def __init__(self, key_ser, val_ser):
- self.key_ser = key_ser
- self.val_ser = val_ser
-
def load_stream(self, stream):
for (keys, vals) in self.prepare_keys_values(stream):
if len(keys) != len(vals):
raise ValueError("Can not deserialize RDD with different number of items"
" in pair: (%d, %d)" % (len(keys), len(vals)))
- for pair in izip(keys, vals):
+ for pair in zip(keys, vals):
yield pair
- def __eq__(self, other):
- return (isinstance(other, PairDeserializer) and
- self.key_ser == other.key_ser and self.val_ser == other.val_ser)
-
def __repr__(self):
return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser))
@@ -382,8 +371,8 @@ def _hijack_namedtuple():
global _old_namedtuple # or it will put in closure
def _copy_func(f):
- return types.FunctionType(f.func_code, f.func_globals, f.func_name,
- f.func_defaults, f.func_closure)
+ return types.FunctionType(f.__code__, f.__globals__, f.__name__,
+ f.__defaults__, f.__closure__)
_old_namedtuple = _copy_func(collections.namedtuple)
@@ -392,15 +381,15 @@ def _hijack_namedtuple():
return _hack_namedtuple(cls)
# replace namedtuple with new one
- collections.namedtuple.func_globals["_old_namedtuple"] = _old_namedtuple
- collections.namedtuple.func_globals["_hack_namedtuple"] = _hack_namedtuple
- collections.namedtuple.func_code = namedtuple.func_code
+ collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple
+ collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple
+ collections.namedtuple.__code__ = namedtuple.__code__
collections.namedtuple.__hijack = 1
# hack the cls already generated by namedtuple
# those created in other module can be pickled as normal,
# so only hack those in __main__ module
- for n, o in sys.modules["__main__"].__dict__.iteritems():
+ for n, o in sys.modules["__main__"].__dict__.items():
if (type(o) is type and o.__base__ is tuple
and hasattr(o, "_fields")
and "__reduce__" not in o.__dict__):
@@ -413,7 +402,7 @@ _hijack_namedtuple()
class PickleSerializer(FramedSerializer):
"""
- Serializes objects using Python's cPickle serializer:
+ Serializes objects using Python's pickle serializer:
http://docs.python.org/2/library/pickle.html
@@ -422,10 +411,14 @@ class PickleSerializer(FramedSerializer):
"""
def dumps(self, obj):
- return cPickle.dumps(obj, 2)
+ return pickle.dumps(obj, protocol)
- def loads(self, obj):
- return cPickle.loads(obj)
+ if sys.version >= '3':
+ def loads(self, obj, encoding="bytes"):
+ return pickle.loads(obj, encoding=encoding)
+ else:
+ def loads(self, obj, encoding=None):
+ return pickle.loads(obj)
class CloudPickleSerializer(PickleSerializer):
@@ -454,7 +447,7 @@ class MarshalSerializer(FramedSerializer):
class AutoSerializer(FramedSerializer):
"""
- Choose marshal or cPickle as serialization protocol automatically
+ Choose marshal or pickle as serialization protocol automatically
"""
def __init__(self):
@@ -463,19 +456,19 @@ class AutoSerializer(FramedSerializer):
def dumps(self, obj):
if self._type is not None:
- return 'P' + cPickle.dumps(obj, -1)
+ return b'P' + pickle.dumps(obj, -1)
try:
- return 'M' + marshal.dumps(obj)
+ return b'M' + marshal.dumps(obj)
except Exception:
- self._type = 'P'
- return 'P' + cPickle.dumps(obj, -1)
+ self._type = b'P'
+ return b'P' + pickle.dumps(obj, -1)
def loads(self, obj):
_type = obj[0]
- if _type == 'M':
+ if _type == b'M':
return marshal.loads(obj[1:])
- elif _type == 'P':
- return cPickle.loads(obj[1:])
+ elif _type == b'P':
+ return pickle.loads(obj[1:])
else:
raise ValueError("invalid sevialization type: %s" % _type)
@@ -495,8 +488,8 @@ class CompressedSerializer(FramedSerializer):
def loads(self, obj):
return self.serializer.loads(zlib.decompress(obj))
- def __eq__(self, other):
- return isinstance(other, CompressedSerializer) and self.serializer == other.serializer
+ def __repr__(self):
+ return "CompressedSerializer(%s)" % self.serializer
class UTF8Deserializer(Serializer):
@@ -505,7 +498,7 @@ class UTF8Deserializer(Serializer):
Deserializes streams written by String.getBytes.
"""
- def __init__(self, use_unicode=False):
+ def __init__(self, use_unicode=True):
self.use_unicode = use_unicode
def loads(self, stream):
@@ -526,13 +519,13 @@ class UTF8Deserializer(Serializer):
except EOFError:
return
- def __eq__(self, other):
- return isinstance(other, UTF8Deserializer) and self.use_unicode == other.use_unicode
+ def __repr__(self):
+ return "UTF8Deserializer(%s)" % self.use_unicode
def read_long(stream):
length = stream.read(8)
- if length == "":
+ if not length:
raise EOFError
return struct.unpack("!q", length)[0]
@@ -547,7 +540,7 @@ def pack_long(value):
def read_int(stream):
length = stream.read(4)
- if length == "":
+ if not length:
raise EOFError
return struct.unpack("!i", length)[0]