aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/serializers.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-04 12:13:41 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-04 12:13:41 -0700
commit59f84a9531f7974a053fd4963ce9afd88273ea4c (patch)
treef2f72b2b2a041624ff4462e777f19874cd477b41 /python/pyspark/serializers.py
parente053c55819363fab7068bb9165e3379f0c2f570c (diff)
downloadspark-59f84a9531f7974a053fd4963ce9afd88273ea4c.tar.gz
spark-59f84a9531f7974a053fd4963ce9afd88273ea4c.tar.bz2
spark-59f84a9531f7974a053fd4963ce9afd88273ea4c.zip
[SPARK-1687] [PySpark] pickable namedtuple
Add an hook to replace original namedtuple with an pickable one, then namedtuple could be used in RDDs. PS: pyspark should be import BEFORE "from collections import namedtuple" Author: Davies Liu <davies.liu@gmail.com> Closes #1623 from davies/namedtuple and squashes the following commits: 045dad8 [Davies Liu] remove unrelated code changes 4132f32 [Davies Liu] address comment 55b1c1a [Davies Liu] fix tests 61f86eb [Davies Liu] replace all the reference of namedtuple to new hacked one 98df6c6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into namedtuple f7b1bde [Davies Liu] add hack for CloudPickleSerializer 0c5c849 [Davies Liu] Merge branch 'master' of github.com:apache/spark into namedtuple 21991e6 [Davies Liu] hack namedtuple in __main__ module, make it picklable. 93b03b8 [Davies Liu] pickable namedtuple
Diffstat (limited to 'python/pyspark/serializers.py')
-rw-r--r--python/pyspark/serializers.py60
1 files changed, 60 insertions, 0 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 03b31ae962..1b52c144df 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -65,6 +65,9 @@ from itertools import chain, izip, product
import marshal
import struct
import sys
+import types
+import collections
+
from pyspark import cloudpickle
@@ -267,6 +270,63 @@ class NoOpSerializer(FramedSerializer):
return obj
+# Hook namedtuple, make it picklable
+
+__cls = {}
+
+
+def _restore(name, fields, value):
+ """ Restore an object of namedtuple"""
+ k = (name, fields)
+ cls = __cls.get(k)
+ if cls is None:
+ cls = collections.namedtuple(name, fields)
+ __cls[k] = cls
+ return cls(*value)
+
+
+def _hack_namedtuple(cls):
+ """ Make class generated by namedtuple picklable """
+ name = cls.__name__
+ fields = cls._fields
+ def __reduce__(self):
+ return (_restore, (name, fields, tuple(self)))
+ cls.__reduce__ = __reduce__
+ return cls
+
+
+def _hijack_namedtuple():
+ """ Hack namedtuple() to make it picklable """
+ global _old_namedtuple # or it will put in closure
+
+ def _copy_func(f):
+ return types.FunctionType(f.func_code, f.func_globals, f.func_name,
+ f.func_defaults, f.func_closure)
+
+ _old_namedtuple = _copy_func(collections.namedtuple)
+
+ def namedtuple(name, fields, verbose=False, rename=False):
+ cls = _old_namedtuple(name, fields, verbose, rename)
+ return _hack_namedtuple(cls)
+
+ # replace namedtuple with new one
+ collections.namedtuple.func_globals["_old_namedtuple"] = _old_namedtuple
+ collections.namedtuple.func_globals["_hack_namedtuple"] = _hack_namedtuple
+ collections.namedtuple.func_code = namedtuple.func_code
+
+ # hack the cls already generated by namedtuple
+ # those created in other module can be pickled as normal,
+ # so only hack those in __main__ module
+ for n, o in sys.modules["__main__"].__dict__.iteritems():
+ if (type(o) is type and o.__base__ is tuple
+ and hasattr(o, "_fields")
+ and "__reduce__" not in o.__dict__):
+ _hack_namedtuple(o) # hack inplace
+
+
+_hijack_namedtuple()
+
+
class PickleSerializer(FramedSerializer):
"""
Serializes objects using Python's cPickle serializer: