diff options
author | Davies Liu <davies@databricks.com> | 2015-09-14 19:46:34 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-09-14 19:46:34 -0700 |
commit | 55204181004c105c7a3e8c31a099b37e48bfd953 (patch) | |
tree | 7a052c7f9c6fb4bb96e2db7c213c49257141e865 /python/pyspark | |
parent | 1a0955250bb65cd6f5818ad60efb62ea4b45d18e (diff) | |
download | spark-55204181004c105c7a3e8c31a099b37e48bfd953.tar.gz spark-55204181004c105c7a3e8c31a099b37e48bfd953.tar.bz2 spark-55204181004c105c7a3e8c31a099b37e48bfd953.zip |
[SPARK-10542] [PYSPARK] fix serialize namedtuple
Author: Davies Liu <davies@databricks.com>
Closes #8707 from davies/fix_namedtuple.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/cloudpickle.py | 15 | ||||
-rw-r--r-- | python/pyspark/serializers.py | 1 | ||||
-rw-r--r-- | python/pyspark/tests.py | 5 |
3 files changed, 20 insertions, 1 deletions
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 3b64798580..95b3abc742 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -350,6 +350,11 @@ class CloudPickler(Pickler): if new_override: d['__new__'] = obj.__new__ + # workaround for namedtuple (hijacked by PySpark) + if getattr(obj, '_is_namedtuple_', False): + self.save_reduce(_load_namedtuple, (obj.__name__, obj._fields)) + return + self.save(_load_class) self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": obj.__doc__}), obj=obj) d.pop('__doc__', None) @@ -382,7 +387,7 @@ class CloudPickler(Pickler): self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj) else: self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__), - obj=obj) + obj=obj) dispatch[types.MethodType] = save_instancemethod def save_inst(self, obj): @@ -744,6 +749,14 @@ def _load_class(cls, d): return cls +def _load_namedtuple(name, fields): + """ + Loads a class generated by namedtuple + """ + from collections import namedtuple + return namedtuple(name, fields) + + """Constructors for 3rd party libraries Note: These can never be renamed due to client compatibility issues""" diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 411b4dbf48..2a1326947f 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -359,6 +359,7 @@ def _hack_namedtuple(cls): def __reduce__(self): return (_restore, (name, fields, tuple(self))) cls.__reduce__ = __reduce__ + cls._is_namedtuple_ = True return cls diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 8bfed074c9..647504c32f 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -218,6 +218,11 @@ class SerializationTestCase(unittest.TestCase): p2 = loads(dumps(p1, 2)) self.assertEqual(p1, p2) + from pyspark.cloudpickle import dumps + P2 = loads(dumps(P)) + p3 = P2(1, 3) + self.assertEqual(p1, p3) + def test_itemgetter(self): from operator import itemgetter ser = CloudPickleSerializer() |