aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/cloudpickle.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-16 16:20:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-16 16:20:57 -0700
commit04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch)
treeb6429253955210445ddc37faa4d5166ea25a91e2 /python/pyspark/cloudpickle.py
parent55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff)
downloadspark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'python/pyspark/cloudpickle.py')
-rw-r--r--python/pyspark/cloudpickle.py577
1 files changed, 163 insertions, 414 deletions
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index bb0783555a..9ef93071d2 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -40,164 +40,126 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
-
+from __future__ import print_function
import operator
import os
+import io
import pickle
import struct
import sys
import types
from functools import partial
import itertools
-from copy_reg import _extension_registry, _inverted_registry, _extension_cache
-import new
import dis
import traceback
-import platform
-
-PyImp = platform.python_implementation()
-
-import logging
-cloudLog = logging.getLogger("Cloud.Transport")
+if sys.version < '3':
+ from pickle import Pickler
+ try:
+ from cStringIO import StringIO
+ except ImportError:
+ from StringIO import StringIO
+ PY3 = False
+else:
+ types.ClassType = type
+ from pickle import _Pickler as Pickler
+ from io import BytesIO as StringIO
+ PY3 = True
#relevant opcodes
-STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL'))
-DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL'))
-LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL'))
+STORE_GLOBAL = dis.opname.index('STORE_GLOBAL')
+DELETE_GLOBAL = dis.opname.index('DELETE_GLOBAL')
+LOAD_GLOBAL = dis.opname.index('LOAD_GLOBAL')
GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
+HAVE_ARGUMENT = dis.HAVE_ARGUMENT
+EXTENDED_ARG = dis.EXTENDED_ARG
-HAVE_ARGUMENT = chr(dis.HAVE_ARGUMENT)
-EXTENDED_ARG = chr(dis.EXTENDED_ARG)
-
-if PyImp == "PyPy":
- # register builtin type in `new`
- new.method = types.MethodType
-
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-# These helper functions were copied from PiCloud's util module.
def islambda(func):
- return getattr(func,'func_name') == '<lambda>'
+ return getattr(func,'__name__') == '<lambda>'
-def xrange_params(xrangeobj):
- """Returns a 3 element tuple describing the xrange start, step, and len
- respectively
- Note: Only guarentees that elements of xrange are the same. parameters may
- be different.
- e.g. xrange(1,1) is interpretted as xrange(0,0); both behave the same
- though w/ iteration
- """
-
- xrange_len = len(xrangeobj)
- if not xrange_len: #empty
- return (0,1,0)
- start = xrangeobj[0]
- if xrange_len == 1: #one element
- return start, 1, 1
- return (start, xrangeobj[1] - xrangeobj[0], xrange_len)
-
-#debug variables intended for developer use:
-printSerialization = False
-printMemoization = False
+_BUILTIN_TYPE_NAMES = {}
+for k, v in types.__dict__.items():
+ if type(v) is type:
+ _BUILTIN_TYPE_NAMES[v] = k
-useForcedImports = True #Should I use forced imports for tracking?
+def _builtin_type(name):
+ return getattr(types, name)
-class CloudPickler(pickle.Pickler):
+class CloudPickler(Pickler):
- dispatch = pickle.Pickler.dispatch.copy()
- savedForceImports = False
- savedDjangoEnv = False #hack tro transport django environment
+ dispatch = Pickler.dispatch.copy()
- def __init__(self, file, protocol=None, min_size_to_save= 0):
- pickle.Pickler.__init__(self,file,protocol)
- self.modules = set() #set of modules needed to depickle
- self.globals_ref = {} # map ids to dictionary. used to ensure that functions can share global env
+ def __init__(self, file, protocol=None):
+ Pickler.__init__(self, file, protocol)
+ # set of modules to unpickle
+ self.modules = set()
+ # map ids to dictionary. used to ensure that functions can share global env
+ self.globals_ref = {}
def dump(self, obj):
- # note: not thread safe
- # minimal side-effects, so not fixing
- recurse_limit = 3000
- base_recurse = sys.getrecursionlimit()
- if base_recurse < recurse_limit:
- sys.setrecursionlimit(recurse_limit)
self.inject_addons()
try:
- return pickle.Pickler.dump(self, obj)
- except RuntimeError, e:
+ return Pickler.dump(self, obj)
+ except RuntimeError as e:
if 'recursion' in e.args[0]:
- msg = """Could not pickle object as excessively deep recursion required.
- Try _fast_serialization=2 or contact PiCloud support"""
+ msg = """Could not pickle object as excessively deep recursion required."""
raise pickle.PicklingError(msg)
- finally:
- new_recurse = sys.getrecursionlimit()
- if new_recurse == recurse_limit:
- sys.setrecursionlimit(base_recurse)
+
+ def save_memoryview(self, obj):
+ """Fallback to save_string"""
+ Pickler.save_string(self, str(obj))
def save_buffer(self, obj):
"""Fallback to save_string"""
- pickle.Pickler.save_string(self,str(obj))
- dispatch[buffer] = save_buffer
+ Pickler.save_string(self,str(obj))
+ if PY3:
+ dispatch[memoryview] = save_memoryview
+ else:
+ dispatch[buffer] = save_buffer
- #block broken objects
- def save_unsupported(self, obj, pack=None):
+ def save_unsupported(self, obj):
raise pickle.PicklingError("Cannot pickle objects of type %s" % type(obj))
dispatch[types.GeneratorType] = save_unsupported
- #python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it
- try:
- slice(0,1).__reduce__()
- except TypeError: #can't pickle -
- dispatch[slice] = save_unsupported
-
- #itertools objects do not pickle!
+ # itertools objects do not pickle!
for v in itertools.__dict__.values():
if type(v) is type:
dispatch[v] = save_unsupported
-
- def save_dict(self, obj):
- """hack fix
- If the dict is a global, deal with it in a special way
- """
- #print 'saving', obj
- if obj is __builtins__:
- self.save_reduce(_get_module_builtins, (), obj=obj)
- else:
- pickle.Pickler.save_dict(self, obj)
- dispatch[pickle.DictionaryType] = save_dict
-
-
- def save_module(self, obj, pack=struct.pack):
+ def save_module(self, obj):
"""
Save a module as an import
"""
- #print 'try save import', obj.__name__
self.modules.add(obj)
- self.save_reduce(subimport,(obj.__name__,), obj=obj)
- dispatch[types.ModuleType] = save_module #new type
+ self.save_reduce(subimport, (obj.__name__,), obj=obj)
+ dispatch[types.ModuleType] = save_module
- def save_codeobject(self, obj, pack=struct.pack):
+ def save_codeobject(self, obj):
"""
Save a code object
"""
- #print 'try to save codeobj: ', obj
- args = (
- obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code,
- obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name,
- obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars
- )
+ if PY3:
+ args = (
+ obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
+ obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames,
+ obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars,
+ obj.co_cellvars
+ )
+ else:
+ args = (
+ obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code,
+ obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name,
+ obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars
+ )
self.save_reduce(types.CodeType, args, obj=obj)
- dispatch[types.CodeType] = save_codeobject #new type
+ dispatch[types.CodeType] = save_codeobject
- def save_function(self, obj, name=None, pack=struct.pack):
+ def save_function(self, obj, name=None):
""" Registered with the dispatch to handle all function types.
Determines what kind of function obj is (e.g. lambda, defined at
@@ -205,12 +167,14 @@ class CloudPickler(pickle.Pickler):
"""
write = self.write
- name = obj.__name__
+ if name is None:
+ name = obj.__name__
modname = pickle.whichmodule(obj, name)
- #print 'which gives %s %s %s' % (modname, obj, name)
+ # print('which gives %s %s %s' % (modname, obj, name))
try:
themodule = sys.modules[modname]
- except KeyError: # eval'd items such as namedtuple give invalid items for their function __module__
+ except KeyError:
+ # eval'd items such as namedtuple give invalid items for their function __module__
modname = '__main__'
if modname == '__main__':
@@ -221,37 +185,18 @@ class CloudPickler(pickle.Pickler):
if getattr(themodule, name, None) is obj:
return self.save_global(obj, name)
- if not self.savedDjangoEnv:
- #hack for django - if we detect the settings module, we transport it
- django_settings = os.environ.get('DJANGO_SETTINGS_MODULE', '')
- if django_settings:
- django_mod = sys.modules.get(django_settings)
- if django_mod:
- cloudLog.debug('Transporting django settings %s during save of %s', django_mod, name)
- self.savedDjangoEnv = True
- self.modules.add(django_mod)
- write(pickle.MARK)
- self.save_reduce(django_settings_load, (django_mod.__name__,), obj=django_mod)
- write(pickle.POP_MARK)
-
-
# if func is lambda, def'ed at prompt, is in main, or is nested, then
# we'll pickle the actual function object rather than simply saving a
# reference (as is done in default pickler), via save_function_tuple.
- if islambda(obj) or obj.func_code.co_filename == '<stdin>' or themodule is None:
- #Force server to import modules that have been imported in main
- modList = None
- if themodule is None and not self.savedForceImports:
- mainmod = sys.modules['__main__']
- if useForcedImports and hasattr(mainmod,'___pyc_forcedImports__'):
- modList = list(mainmod.___pyc_forcedImports__)
- self.savedForceImports = True
- self.save_function_tuple(obj, modList)
+ if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
+ #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
+ self.save_function_tuple(obj)
return
- else: # func is nested
+ else:
+ # func is nested
klass = getattr(themodule, name, None)
if klass is None or klass is not obj:
- self.save_function_tuple(obj, [themodule])
+ self.save_function_tuple(obj)
return
if obj.__dict__:
@@ -266,7 +211,7 @@ class CloudPickler(pickle.Pickler):
self.memoize(obj)
dispatch[types.FunctionType] = save_function
- def save_function_tuple(self, func, forced_imports):
+ def save_function_tuple(self, func):
""" Pickles an actual func object.
A func comprises: code, globals, defaults, closure, and dict. We
@@ -281,19 +226,6 @@ class CloudPickler(pickle.Pickler):
save = self.save
write = self.write
- # save the modules (if any)
- if forced_imports:
- write(pickle.MARK)
- save(_modules_to_main)
- #print 'forced imports are', forced_imports
-
- forced_names = map(lambda m: m.__name__, forced_imports)
- save((forced_names,))
-
- #save((forced_imports,))
- write(pickle.REDUCE)
- write(pickle.POP_MARK)
-
code, f_globals, defaults, closure, dct, base_globals = self.extract_func_data(func)
save(_fill_function) # skeleton function updater
@@ -318,6 +250,8 @@ class CloudPickler(pickle.Pickler):
Find all globals names read or written to by codeblock co
"""
code = co.co_code
+ if not PY3:
+ code = [ord(c) for c in code]
names = co.co_names
out_names = set()
@@ -327,18 +261,18 @@ class CloudPickler(pickle.Pickler):
while i < n:
op = code[i]
- i = i+1
+ i += 1
if op >= HAVE_ARGUMENT:
- oparg = ord(code[i]) + ord(code[i+1])*256 + extended_arg
+ oparg = code[i] + code[i+1] * 256 + extended_arg
extended_arg = 0
- i = i+2
+ i += 2
if op == EXTENDED_ARG:
- extended_arg = oparg*65536L
+ extended_arg = oparg*65536
if op in GLOBAL_OPS:
out_names.add(names[oparg])
- #print 'extracted', out_names, ' from ', names
- if co.co_consts: # see if nested function have any global refs
+ # see if nested function have any global refs
+ if co.co_consts:
for const in co.co_consts:
if type(const) is types.CodeType:
out_names |= CloudPickler.extract_code_globals(const)
@@ -350,46 +284,28 @@ class CloudPickler(pickle.Pickler):
Turn the function into a tuple of data necessary to recreate it:
code, globals, defaults, closure, dict
"""
- code = func.func_code
+ code = func.__code__
# extract all global ref's
- func_global_refs = CloudPickler.extract_code_globals(code)
+ func_global_refs = self.extract_code_globals(code)
# process all variables referenced by global environment
f_globals = {}
for var in func_global_refs:
- #Some names, such as class functions are not global - we don't need them
- if func.func_globals.has_key(var):
- f_globals[var] = func.func_globals[var]
+ if var in func.__globals__:
+ f_globals[var] = func.__globals__[var]
# defaults requires no processing
- defaults = func.func_defaults
-
- def get_contents(cell):
- try:
- return cell.cell_contents
- except ValueError, e: #cell is empty error on not yet assigned
- raise pickle.PicklingError('Function to be pickled has free variables that are referenced before assignment in enclosing scope')
-
+ defaults = func.__defaults__
# process closure
- if func.func_closure:
- closure = map(get_contents, func.func_closure)
- else:
- closure = []
+ closure = [c.cell_contents for c in func.__closure__] if func.__closure__ else []
# save the dict
- dct = func.func_dict
-
- if printSerialization:
- outvars = ['code: ' + str(code) ]
- outvars.append('globals: ' + str(f_globals))
- outvars.append('defaults: ' + str(defaults))
- outvars.append('closure: ' + str(closure))
- print 'function ', func, 'is extracted to: ', ', '.join(outvars)
+ dct = func.__dict__
- base_globals = self.globals_ref.get(id(func.func_globals), {})
- self.globals_ref[id(func.func_globals)] = base_globals
+ base_globals = self.globals_ref.get(id(func.__globals__), {})
+ self.globals_ref[id(func.__globals__)] = base_globals
return (code, f_globals, defaults, closure, dct, base_globals)
@@ -400,8 +316,9 @@ class CloudPickler(pickle.Pickler):
dispatch[types.BuiltinFunctionType] = save_builtin_function
def save_global(self, obj, name=None, pack=struct.pack):
- write = self.write
- memo = self.memo
+ if obj.__module__ == "__builtin__" or obj.__module__ == "builtins":
+ if obj in _BUILTIN_TYPE_NAMES:
+ return self.save_reduce(_builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj)
if name is None:
name = obj.__name__
@@ -410,98 +327,57 @@ class CloudPickler(pickle.Pickler):
if modname is None:
modname = pickle.whichmodule(obj, name)
- try:
- __import__(modname)
- themodule = sys.modules[modname]
- except (ImportError, KeyError, AttributeError): #should never occur
- raise pickle.PicklingError(
- "Can't pickle %r: Module %s cannot be found" %
- (obj, modname))
-
if modname == '__main__':
themodule = None
-
- if themodule:
+ else:
+ __import__(modname)
+ themodule = sys.modules[modname]
self.modules.add(themodule)
- sendRef = True
- typ = type(obj)
- #print 'saving', obj, typ
- try:
- try: #Deal with case when getattribute fails with exceptions
- klass = getattr(themodule, name)
- except (AttributeError):
- if modname == '__builtin__': #new.* are misrepeported
- modname = 'new'
- __import__(modname)
- themodule = sys.modules[modname]
- try:
- klass = getattr(themodule, name)
- except AttributeError, a:
- # print themodule, name, obj, type(obj)
- raise pickle.PicklingError("Can't pickle builtin %s" % obj)
- else:
- raise
+ if hasattr(themodule, name) and getattr(themodule, name) is obj:
+ return Pickler.save_global(self, obj, name)
- except (ImportError, KeyError, AttributeError):
- if typ == types.TypeType or typ == types.ClassType:
- sendRef = False
- else: #we can't deal with this
- raise
- else:
- if klass is not obj and (typ == types.TypeType or typ == types.ClassType):
- sendRef = False
- if not sendRef:
- #note: Third party types might crash this - add better checks!
- d = dict(obj.__dict__) #copy dict proxy to a dict
- if not isinstance(d.get('__dict__', None), property): # don't extract dict that are properties
- d.pop('__dict__',None)
- d.pop('__weakref__',None)
+ typ = type(obj)
+ if typ is not obj and isinstance(obj, (type, types.ClassType)):
+ d = dict(obj.__dict__) # copy dict proxy to a dict
+ if not isinstance(d.get('__dict__', None), property):
+ # don't extract dict that are properties
+ d.pop('__dict__', None)
+ d.pop('__weakref__', None)
# hack as __new__ is stored differently in the __dict__
new_override = d.get('__new__', None)
if new_override:
d['__new__'] = obj.__new__
- self.save_reduce(type(obj),(obj.__name__,obj.__bases__,
- d),obj=obj)
- #print 'internal reduce dask %s %s' % (obj, d)
- return
-
- if self.proto >= 2:
- code = _extension_registry.get((modname, name))
- if code:
- assert code > 0
- if code <= 0xff:
- write(pickle.EXT1 + chr(code))
- elif code <= 0xffff:
- write("%c%c%c" % (pickle.EXT2, code&0xff, code>>8))
- else:
- write(pickle.EXT4 + pack("<i", code))
- return
+ self.save_reduce(typ, (obj.__name__, obj.__bases__, d), obj=obj)
+ else:
+ raise pickle.PicklingError("Can't pickle %r" % obj)
- write(pickle.GLOBAL + modname + '\n' + name + '\n')
- self.memoize(obj)
+ dispatch[type] = save_global
dispatch[types.ClassType] = save_global
- dispatch[types.TypeType] = save_global
def save_instancemethod(self, obj):
- #Memoization rarely is ever useful due to python bounding
- self.save_reduce(types.MethodType, (obj.im_func, obj.im_self,obj.im_class), obj=obj)
+ # Memoization rarely is ever useful due to python bounding
+ if PY3:
+ self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
+ else:
+ self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),
+ obj=obj)
dispatch[types.MethodType] = save_instancemethod
- def save_inst_logic(self, obj):
+ def save_inst(self, obj):
"""Inner logic to save instance. Based off pickle.save_inst
Supports __transient__"""
cls = obj.__class__
- memo = self.memo
+ memo = self.memo
write = self.write
- save = self.save
+ save = self.save
if hasattr(obj, '__getinitargs__'):
args = obj.__getinitargs__()
- len(args) # XXX Assert it's a sequence
+ len(args) # XXX Assert it's a sequence
pickle._keep_alive(args, memo)
else:
args = ()
@@ -537,15 +413,8 @@ class CloudPickler(pickle.Pickler):
save(stuff)
write(pickle.BUILD)
-
- def save_inst(self, obj):
- # Hack to detect PIL Image instances without importing Imaging
- # PIL can be loaded with multiple names, so we don't check sys.modules for it
- if hasattr(obj,'im') and hasattr(obj,'palette') and 'Image' in obj.__module__:
- self.save_image(obj)
- else:
- self.save_inst_logic(obj)
- dispatch[types.InstanceType] = save_inst
+ if not PY3:
+ dispatch[types.InstanceType] = save_inst
def save_property(self, obj):
# properties not correctly saved in python
@@ -592,7 +461,7 @@ class CloudPickler(pickle.Pickler):
"""Modified to support __transient__ on new objects
Change only affects protocol level 2 (which is always used by PiCloud"""
# Assert that args is a tuple or None
- if not isinstance(args, types.TupleType):
+ if not isinstance(args, tuple):
raise pickle.PicklingError("args from reduce() should be a tuple")
# Assert that func is callable
@@ -646,35 +515,23 @@ class CloudPickler(pickle.Pickler):
self._batch_setitems(dictitems)
if state is not None:
- #print 'obj %s has state %s' % (obj, state)
save(state)
write(pickle.BUILD)
-
- def save_xrange(self, obj):
- """Save an xrange object in python 2.5
- Python 2.6 supports this natively
- """
- range_params = xrange_params(obj)
- self.save_reduce(_build_xrange,range_params)
-
- #python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it
- try:
- xrange(0).__reduce__()
- except TypeError: #can't pickle -- use PiCloud pickler
- dispatch[xrange] = save_xrange
-
def save_partial(self, obj):
"""Partial objects do not serialize correctly in python2.x -- this fixes the bugs"""
self.save_reduce(_genpartial, (obj.func, obj.args, obj.keywords))
- if sys.version_info < (2,7): #2.7 supports partial pickling
+ if sys.version_info < (2,7): # 2.7 supports partial pickling
dispatch[partial] = save_partial
def save_file(self, obj):
"""Save a file"""
- import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute
+ try:
+ import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute
+ except ImportError:
+ import io as pystringIO
if not hasattr(obj, 'name') or not hasattr(obj, 'mode'):
raise pickle.PicklingError("Cannot pickle files that do not map to an actual file")
@@ -720,10 +577,14 @@ class CloudPickler(pickle.Pickler):
retval.seek(curloc)
retval.name = name
- self.save(retval) #save stringIO
+ self.save(retval)
self.memoize(obj)
- dispatch[file] = save_file
+ if PY3:
+ dispatch[io.TextIOWrapper] = save_file
+ else:
+ dispatch[file] = save_file
+
"""Special functions for Add-on libraries"""
def inject_numpy(self):
@@ -732,76 +593,20 @@ class CloudPickler(pickle.Pickler):
return
self.dispatch[numpy.ufunc] = self.__class__.save_ufunc
- numpy_tst_mods = ['numpy', 'scipy.special']
def save_ufunc(self, obj):
"""Hack function for saving numpy ufunc objects"""
name = obj.__name__
- for tst_mod_name in self.numpy_tst_mods:
+ numpy_tst_mods = ['numpy', 'scipy.special']
+ for tst_mod_name in numpy_tst_mods:
tst_mod = sys.modules.get(tst_mod_name, None)
- if tst_mod:
- if name in tst_mod.__dict__:
- self.save_reduce(_getobject, (tst_mod_name, name))
- return
- raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in' % str(obj))
-
- def inject_timeseries(self):
- """Handle bugs with pickling scikits timeseries"""
- tseries = sys.modules.get('scikits.timeseries.tseries')
- if not tseries or not hasattr(tseries, 'Timeseries'):
- return
- self.dispatch[tseries.Timeseries] = self.__class__.save_timeseries
-
- def save_timeseries(self, obj):
- import scikits.timeseries.tseries as ts
-
- func, reduce_args, state = obj.__reduce__()
- if func != ts._tsreconstruct:
- raise pickle.PicklingError('timeseries using unexpected reconstruction function %s' % str(func))
- state = (1,
- obj.shape,
- obj.dtype,
- obj.flags.fnc,
- obj._data.tostring(),
- ts.getmaskarray(obj).tostring(),
- obj._fill_value,
- obj._dates.shape,
- obj._dates.__array__().tostring(),
- obj._dates.dtype, #added -- preserve type
- obj.freq,
- obj._optinfo,
- )
- return self.save_reduce(_genTimeSeries, (reduce_args, state))
-
- def inject_email(self):
- """Block email LazyImporters from being saved"""
- email = sys.modules.get('email')
- if not email:
- return
- self.dispatch[email.LazyImporter] = self.__class__.save_unsupported
+ if tst_mod and name in tst_mod.__dict__:
+ return self.save_reduce(_getobject, (tst_mod_name, name))
+ raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in'
+ % str(obj))
def inject_addons(self):
"""Plug in system. Register additional pickling functions if modules already loaded"""
self.inject_numpy()
- self.inject_timeseries()
- self.inject_email()
-
- """Python Imaging Library"""
- def save_image(self, obj):
- if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name \
- and not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()):
- #if image not loaded yet -- lazy load
- self.save_reduce(_lazyloadImage,(obj.fp,), obj=obj)
- else:
- #image is loaded - just transmit it over
- self.save_reduce(_generateImage, (obj.size, obj.mode, obj.tostring()), obj=obj)
-
- """
- def memoize(self, obj):
- pickle.Pickler.memoize(self, obj)
- if printMemoization:
- print 'memoizing ' + str(obj)
- """
-
# Shorthands for legacy support
@@ -809,14 +614,13 @@ class CloudPickler(pickle.Pickler):
def dump(obj, file, protocol=2):
CloudPickler(file, protocol).dump(obj)
+
def dumps(obj, protocol=2):
file = StringIO()
cp = CloudPickler(file,protocol)
cp.dump(obj)
- #print 'cloud dumped', str(obj), str(cp.modules)
-
return file.getvalue()
@@ -825,25 +629,6 @@ def subimport(name):
__import__(name)
return sys.modules[name]
-#hack to load django settings:
-def django_settings_load(name):
- modified_env = False
-
- if 'DJANGO_SETTINGS_MODULE' not in os.environ:
- os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps
- modified_env = True
- try:
- module = subimport(name)
- except Exception, i:
- print >> sys.stderr, 'Cloud not import django settings %s:' % (name)
- print_exec(sys.stderr)
- if modified_env:
- del os.environ['DJANGO_SETTINGS_MODULE']
- else:
- #add project directory to sys,path:
- if hasattr(module,'__file__'):
- dirname = os.path.split(module.__file__)[0] + '/'
- sys.path.append(dirname)
# restores function attributes
def _restore_attr(obj, attr):
@@ -851,13 +636,16 @@ def _restore_attr(obj, attr):
setattr(obj, key, val)
return obj
+
def _get_module_builtins():
return pickle.__builtins__
+
def print_exec(stream):
ei = sys.exc_info()
traceback.print_exception(ei[0], ei[1], ei[2], None, stream)
+
def _modules_to_main(modList):
"""Force every module in modList to be placed into main"""
if not modList:
@@ -868,22 +656,16 @@ def _modules_to_main(modList):
if type(modname) is str:
try:
mod = __import__(modname)
- except Exception, i: #catch all...
- sys.stderr.write('warning: could not import %s\n. Your function may unexpectedly error due to this import failing; \
-A version mismatch is likely. Specific error was:\n' % modname)
+ except Exception as e:
+ sys.stderr.write('warning: could not import %s\n. '
+ 'Your function may unexpectedly error due to this import failing;'
+ 'A version mismatch is likely. Specific error was:\n' % modname)
print_exec(sys.stderr)
else:
- setattr(main,mod.__name__, mod)
- else:
- #REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD)
- #In old version actual module was sent
- setattr(main,modname.__name__, modname)
+ setattr(main, mod.__name__, mod)
-#object generators:
-def _build_xrange(start, step, len):
- """Built xrange explicitly"""
- return xrange(start, start + step*len, step)
+#object generators:
def _genpartial(func, args, kwds):
if not args:
args = ()
@@ -891,22 +673,26 @@ def _genpartial(func, args, kwds):
kwds = {}
return partial(func, *args, **kwds)
+
def _fill_function(func, globals, defaults, dict):
""" Fills in the rest of function data into the skeleton function object
that were created via _make_skel_func().
"""
- func.func_globals.update(globals)
- func.func_defaults = defaults
- func.func_dict = dict
+ func.__globals__.update(globals)
+ func.__defaults__ = defaults
+ func.__dict__ = dict
return func
+
def _make_cell(value):
- return (lambda: value).func_closure[0]
+ return (lambda: value).__closure__[0]
+
def _reconstruct_closure(values):
return tuple([_make_cell(v) for v in values])
+
def _make_skel_func(code, closures, base_globals = None):
""" Creates a skeleton function object that contains just the provided
code and the correct number of cells in func_closure. All other
@@ -928,40 +714,3 @@ Note: These can never be renamed due to client compatibility issues"""
def _getobject(modname, attribute):
mod = __import__(modname, fromlist=[attribute])
return mod.__dict__[attribute]
-
-def _generateImage(size, mode, str_rep):
- """Generate image from string representation"""
- import Image
- i = Image.new(mode, size)
- i.fromstring(str_rep)
- return i
-
-def _lazyloadImage(fp):
- import Image
- fp.seek(0) #works in almost any case
- return Image.open(fp)
-
-"""Timeseries"""
-def _genTimeSeries(reduce_args, state):
- import scikits.timeseries.tseries as ts
- from numpy import ndarray
- from numpy.ma import MaskedArray
-
-
- time_series = ts._tsreconstruct(*reduce_args)
-
- #from setstate modified
- (ver, shp, typ, isf, raw, msk, flv, dsh, dtm, dtyp, frq, infodict) = state
- #print 'regenerating %s' % dtyp
-
- MaskedArray.__setstate__(time_series, (ver, shp, typ, isf, raw, msk, flv))
- _dates = time_series._dates
- #_dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ
- ndarray.__setstate__(_dates,(dsh,dtyp, isf, dtm))
- _dates.freq = frq
- _dates._cachedinfo.update(dict(full=None, hasdups=None, steps=None,
- toobj=None, toord=None, tostr=None))
- # Update the _optinfo dictionary
- time_series._optinfo.update(infodict)
- return time_series
-