aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-09-14 19:46:34 -0700
committerDavies Liu <davies.liu@gmail.com>2015-09-14 19:46:34 -0700
commit55204181004c105c7a3e8c31a099b37e48bfd953 (patch)
tree7a052c7f9c6fb4bb96e2db7c213c49257141e865 /python
parent1a0955250bb65cd6f5818ad60efb62ea4b45d18e (diff)
downloadspark-55204181004c105c7a3e8c31a099b37e48bfd953.tar.gz
spark-55204181004c105c7a3e8c31a099b37e48bfd953.tar.bz2
spark-55204181004c105c7a3e8c31a099b37e48bfd953.zip
[SPARK-10542] [PYSPARK] fix serialize namedtuple
Author: Davies Liu <davies@databricks.com> Closes #8707 from davies/fix_namedtuple.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/cloudpickle.py15
-rw-r--r--python/pyspark/serializers.py1
-rw-r--r--python/pyspark/tests.py5
3 files changed, 20 insertions, 1 deletions
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 3b64798580..95b3abc742 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -350,6 +350,11 @@ class CloudPickler(Pickler):
if new_override:
d['__new__'] = obj.__new__
+ # workaround for namedtuple (hijacked by PySpark)
+ if getattr(obj, '_is_namedtuple_', False):
+ self.save_reduce(_load_namedtuple, (obj.__name__, obj._fields))
+ return
+
self.save(_load_class)
self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": obj.__doc__}), obj=obj)
d.pop('__doc__', None)
@@ -382,7 +387,7 @@ class CloudPickler(Pickler):
self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
else:
self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),
- obj=obj)
+ obj=obj)
dispatch[types.MethodType] = save_instancemethod
def save_inst(self, obj):
@@ -744,6 +749,14 @@ def _load_class(cls, d):
return cls
+def _load_namedtuple(name, fields):
+ """
+ Loads a class generated by namedtuple
+ """
+ from collections import namedtuple
+ return namedtuple(name, fields)
+
+
"""Constructors for 3rd party libraries
Note: These can never be renamed due to client compatibility issues"""
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 411b4dbf48..2a1326947f 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -359,6 +359,7 @@ def _hack_namedtuple(cls):
def __reduce__(self):
return (_restore, (name, fields, tuple(self)))
cls.__reduce__ = __reduce__
+ cls._is_namedtuple_ = True
return cls
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8bfed074c9..647504c32f 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -218,6 +218,11 @@ class SerializationTestCase(unittest.TestCase):
p2 = loads(dumps(p1, 2))
self.assertEqual(p1, p2)
+ from pyspark.cloudpickle import dumps
+ P2 = loads(dumps(P))
+ p3 = P2(1, 3)
+ self.assertEqual(p1, p3)
+
def test_itemgetter(self):
from operator import itemgetter
ser = CloudPickleSerializer()