aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/cloudpickle.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/cloudpickle.py')
-rw-r--r--python/pyspark/cloudpickle.py577
1 files changed, 163 insertions, 414 deletions
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index bb0783555a..9ef93071d2 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -40,164 +40,126 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
-
+from __future__ import print_function
import operator
import os
+import io
import pickle
import struct
import sys
import types
from functools import partial
import itertools
-from copy_reg import _extension_registry, _inverted_registry, _extension_cache
-import new
import dis
import traceback
-import platform
-
-PyImp = platform.python_implementation()
-
-import logging
-cloudLog = logging.getLogger("Cloud.Transport")
+if sys.version < '3':
+ from pickle import Pickler
+ try:
+ from cStringIO import StringIO
+ except ImportError:
+ from StringIO import StringIO
+ PY3 = False
+else:
+ types.ClassType = type
+ from pickle import _Pickler as Pickler
+ from io import BytesIO as StringIO
+ PY3 = True
#relevant opcodes
-STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL'))
-DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL'))
-LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL'))
+STORE_GLOBAL = dis.opname.index('STORE_GLOBAL')
+DELETE_GLOBAL = dis.opname.index('DELETE_GLOBAL')
+LOAD_GLOBAL = dis.opname.index('LOAD_GLOBAL')
GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
+HAVE_ARGUMENT = dis.HAVE_ARGUMENT
+EXTENDED_ARG = dis.EXTENDED_ARG
-HAVE_ARGUMENT = chr(dis.HAVE_ARGUMENT)
-EXTENDED_ARG = chr(dis.EXTENDED_ARG)
-
-if PyImp == "PyPy":
- # register builtin type in `new`
- new.method = types.MethodType
-
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-# These helper functions were copied from PiCloud's util module.
def islambda(func):
- return getattr(func,'func_name') == '<lambda>'
+ return getattr(func,'__name__') == '<lambda>'
-def xrange_params(xrangeobj):
- """Returns a 3 element tuple describing the xrange start, step, and len
- respectively
- Note: Only guarentees that elements of xrange are the same. parameters may
- be different.
- e.g. xrange(1,1) is interpretted as xrange(0,0); both behave the same
- though w/ iteration
- """
-
- xrange_len = len(xrangeobj)
- if not xrange_len: #empty
- return (0,1,0)
- start = xrangeobj[0]
- if xrange_len == 1: #one element
- return start, 1, 1
- return (start, xrangeobj[1] - xrangeobj[0], xrange_len)
-
-#debug variables intended for developer use:
-printSerialization = False
-printMemoization = False
+_BUILTIN_TYPE_NAMES = {}
+for k, v in types.__dict__.items():
+ if type(v) is type:
+ _BUILTIN_TYPE_NAMES[v] = k
-useForcedImports = True #Should I use forced imports for tracking?
+def _builtin_type(name):
+ return getattr(types, name)
-class CloudPickler(pickle.Pickler):
+class CloudPickler(Pickler):
- dispatch = pickle.Pickler.dispatch.copy()
- savedForceImports = False
- savedDjangoEnv = False #hack tro transport django environment
+ dispatch = Pickler.dispatch.copy()
- def __init__(self, file, protocol=None, min_size_to_save= 0):
- pickle.Pickler.__init__(self,file,protocol)
- self.modules = set() #set of modules needed to depickle
- self.globals_ref = {} # map ids to dictionary. used to ensure that functions can share global env
+ def __init__(self, file, protocol=None):
+ Pickler.__init__(self, file, protocol)
+ # set of modules to unpickle
+ self.modules = set()
+ # map ids to dictionary. used to ensure that functions can share global env
+ self.globals_ref = {}
def dump(self, obj):
- # note: not thread safe
- # minimal side-effects, so not fixing
- recurse_limit = 3000
- base_recurse = sys.getrecursionlimit()
- if base_recurse < recurse_limit:
- sys.setrecursionlimit(recurse_limit)
self.inject_addons()
try:
- return pickle.Pickler.dump(self, obj)
- except RuntimeError, e:
+ return Pickler.dump(self, obj)
+ except RuntimeError as e:
if 'recursion' in e.args[0]:
- msg = """Could not pickle object as excessively deep recursion required.
- Try _fast_serialization=2 or contact PiCloud support"""
+ msg = """Could not pickle object as excessively deep recursion required."""
raise pickle.PicklingError(msg)
- finally:
- new_recurse = sys.getrecursionlimit()
- if new_recurse == recurse_limit:
- sys.setrecursionlimit(base_recurse)
+
+ def save_memoryview(self, obj):
+ """Fallback to save_string"""
+ Pickler.save_string(self, str(obj))
def save_buffer(self, obj):
"""Fallback to save_string"""
- pickle.Pickler.save_string(self,str(obj))
- dispatch[buffer] = save_buffer
+ Pickler.save_string(self,str(obj))
+ if PY3:
+ dispatch[memoryview] = save_memoryview
+ else:
+ dispatch[buffer] = save_buffer
- #block broken objects
- def save_unsupported(self, obj, pack=None):
+ def save_unsupported(self, obj):
raise pickle.PicklingError("Cannot pickle objects of type %s" % type(obj))
dispatch[types.GeneratorType] = save_unsupported
- #python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it
- try:
- slice(0,1).__reduce__()
- except TypeError: #can't pickle -
- dispatch[slice] = save_unsupported
-
- #itertools objects do not pickle!
+ # itertools objects do not pickle!
for v in itertools.__dict__.values():
if type(v) is type:
dispatch[v] = save_unsupported
-
- def save_dict(self, obj):
- """hack fix
- If the dict is a global, deal with it in a special way
- """
- #print 'saving', obj
- if obj is __builtins__:
- self.save_reduce(_get_module_builtins, (), obj=obj)
- else:
- pickle.Pickler.save_dict(self, obj)
- dispatch[pickle.DictionaryType] = save_dict
-
-
- def save_module(self, obj, pack=struct.pack):
+ def save_module(self, obj):
"""
Save a module as an import
"""
- #print 'try save import', obj.__name__
self.modules.add(obj)
- self.save_reduce(subimport,(obj.__name__,), obj=obj)
- dispatch[types.ModuleType] = save_module #new type
+ self.save_reduce(subimport, (obj.__name__,), obj=obj)
+ dispatch[types.ModuleType] = save_module
- def save_codeobject(self, obj, pack=struct.pack):
+ def save_codeobject(self, obj):
"""
Save a code object
"""
- #print 'try to save codeobj: ', obj
- args = (
- obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code,
- obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name,
- obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars
- )
+ if PY3:
+ args = (
+ obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
+ obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames,
+ obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars,
+ obj.co_cellvars
+ )
+ else:
+ args = (
+ obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code,
+ obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name,
+ obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars
+ )
self.save_reduce(types.CodeType, args, obj=obj)
- dispatch[types.CodeType] = save_codeobject #new type
+ dispatch[types.CodeType] = save_codeobject
- def save_function(self, obj, name=None, pack=struct.pack):
+ def save_function(self, obj, name=None):
""" Registered with the dispatch to handle all function types.
Determines what kind of function obj is (e.g. lambda, defined at
@@ -205,12 +167,14 @@ class CloudPickler(pickle.Pickler):
"""
write = self.write
- name = obj.__name__
+ if name is None:
+ name = obj.__name__
modname = pickle.whichmodule(obj, name)
- #print 'which gives %s %s %s' % (modname, obj, name)
+ # print('which gives %s %s %s' % (modname, obj, name))
try:
themodule = sys.modules[modname]
- except KeyError: # eval'd items such as namedtuple give invalid items for their function __module__
+ except KeyError:
+ # eval'd items such as namedtuple give invalid items for their function __module__
modname = '__main__'
if modname == '__main__':
@@ -221,37 +185,18 @@ class CloudPickler(pickle.Pickler):
if getattr(themodule, name, None) is obj:
return self.save_global(obj, name)
- if not self.savedDjangoEnv:
- #hack for django - if we detect the settings module, we transport it
- django_settings = os.environ.get('DJANGO_SETTINGS_MODULE', '')
- if django_settings:
- django_mod = sys.modules.get(django_settings)
- if django_mod:
- cloudLog.debug('Transporting django settings %s during save of %s', django_mod, name)
- self.savedDjangoEnv = True
- self.modules.add(django_mod)
- write(pickle.MARK)
- self.save_reduce(django_settings_load, (django_mod.__name__,), obj=django_mod)
- write(pickle.POP_MARK)
-
-
# if func is lambda, def'ed at prompt, is in main, or is nested, then
# we'll pickle the actual function object rather than simply saving a
# reference (as is done in default pickler), via save_function_tuple.
- if islambda(obj) or obj.func_code.co_filename == '<stdin>' or themodule is None:
- #Force server to import modules that have been imported in main
- modList = None
- if themodule is None and not self.savedForceImports:
- mainmod = sys.modules['__main__']
- if useForcedImports and hasattr(mainmod,'___pyc_forcedImports__'):
- modList = list(mainmod.___pyc_forcedImports__)
- self.savedForceImports = True
- self.save_function_tuple(obj, modList)
+ if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
+ #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
+ self.save_function_tuple(obj)
return
- else: # func is nested
+ else:
+ # func is nested
klass = getattr(themodule, name, None)
if klass is None or klass is not obj:
- self.save_function_tuple(obj, [themodule])
+ self.save_function_tuple(obj)
return
if obj.__dict__:
@@ -266,7 +211,7 @@ class CloudPickler(pickle.Pickler):
self.memoize(obj)
dispatch[types.FunctionType] = save_function
- def save_function_tuple(self, func, forced_imports):
+ def save_function_tuple(self, func):
""" Pickles an actual func object.
A func comprises: code, globals, defaults, closure, and dict. We
@@ -281,19 +226,6 @@ class CloudPickler(pickle.Pickler):
save = self.save
write = self.write
- # save the modules (if any)
- if forced_imports:
- write(pickle.MARK)
- save(_modules_to_main)
- #print 'forced imports are', forced_imports
-
- forced_names = map(lambda m: m.__name__, forced_imports)
- save((forced_names,))
-
- #save((forced_imports,))
- write(pickle.REDUCE)
- write(pickle.POP_MARK)
-
code, f_globals, defaults, closure, dct, base_globals = self.extract_func_data(func)
save(_fill_function) # skeleton function updater
@@ -318,6 +250,8 @@ class CloudPickler(pickle.Pickler):
Find all globals names read or written to by codeblock co
"""
code = co.co_code
+ if not PY3:
+ code = [ord(c) for c in code]
names = co.co_names
out_names = set()
@@ -327,18 +261,18 @@ class CloudPickler(pickle.Pickler):
while i < n:
op = code[i]
- i = i+1
+ i += 1
if op >= HAVE_ARGUMENT:
- oparg = ord(code[i]) + ord(code[i+1])*256 + extended_arg
+ oparg = code[i] + code[i+1] * 256 + extended_arg
extended_arg = 0
- i = i+2
+ i += 2
if op == EXTENDED_ARG:
- extended_arg = oparg*65536L
+ extended_arg = oparg*65536
if op in GLOBAL_OPS:
out_names.add(names[oparg])
- #print 'extracted', out_names, ' from ', names
- if co.co_consts: # see if nested function have any global refs
+ # see if nested function have any global refs
+ if co.co_consts:
for const in co.co_consts:
if type(const) is types.CodeType:
out_names |= CloudPickler.extract_code_globals(const)
@@ -350,46 +284,28 @@ class CloudPickler(pickle.Pickler):
Turn the function into a tuple of data necessary to recreate it:
code, globals, defaults, closure, dict
"""
- code = func.func_code
+ code = func.__code__
# extract all global ref's
- func_global_refs = CloudPickler.extract_code_globals(code)
+ func_global_refs = self.extract_code_globals(code)
# process all variables referenced by global environment
f_globals = {}
for var in func_global_refs:
- #Some names, such as class functions are not global - we don't need them
- if func.func_globals.has_key(var):
- f_globals[var] = func.func_globals[var]
+ if var in func.__globals__:
+ f_globals[var] = func.__globals__[var]
# defaults requires no processing
- defaults = func.func_defaults
-
- def get_contents(cell):
- try:
- return cell.cell_contents
- except ValueError, e: #cell is empty error on not yet assigned
- raise pickle.PicklingError('Function to be pickled has free variables that are referenced before assignment in enclosing scope')
-
+ defaults = func.__defaults__
# process closure
- if func.func_closure:
- closure = map(get_contents, func.func_closure)
- else:
- closure = []
+ closure = [c.cell_contents for c in func.__closure__] if func.__closure__ else []
# save the dict
- dct = func.func_dict
-
- if printSerialization:
- outvars = ['code: ' + str(code) ]
- outvars.append('globals: ' + str(f_globals))
- outvars.append('defaults: ' + str(defaults))
- outvars.append('closure: ' + str(closure))
- print 'function ', func, 'is extracted to: ', ', '.join(outvars)
+ dct = func.__dict__
- base_globals = self.globals_ref.get(id(func.func_globals), {})
- self.globals_ref[id(func.func_globals)] = base_globals
+ base_globals = self.globals_ref.get(id(func.__globals__), {})
+ self.globals_ref[id(func.__globals__)] = base_globals
return (code, f_globals, defaults, closure, dct, base_globals)
@@ -400,8 +316,9 @@ class CloudPickler(pickle.Pickler):
dispatch[types.BuiltinFunctionType] = save_builtin_function
def save_global(self, obj, name=None, pack=struct.pack):
- write = self.write
- memo = self.memo
+ if obj.__module__ == "__builtin__" or obj.__module__ == "builtins":
+ if obj in _BUILTIN_TYPE_NAMES:
+ return self.save_reduce(_builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj)
if name is None:
name = obj.__name__
@@ -410,98 +327,57 @@ class CloudPickler(pickle.Pickler):
if modname is None:
modname = pickle.whichmodule(obj, name)
- try:
- __import__(modname)
- themodule = sys.modules[modname]
- except (ImportError, KeyError, AttributeError): #should never occur
- raise pickle.PicklingError(
- "Can't pickle %r: Module %s cannot be found" %
- (obj, modname))
-
if modname == '__main__':
themodule = None
-
- if themodule:
+ else:
+ __import__(modname)
+ themodule = sys.modules[modname]
self.modules.add(themodule)
- sendRef = True
- typ = type(obj)
- #print 'saving', obj, typ
- try:
- try: #Deal with case when getattribute fails with exceptions
- klass = getattr(themodule, name)
- except (AttributeError):
- if modname == '__builtin__': #new.* are misrepeported
- modname = 'new'
- __import__(modname)
- themodule = sys.modules[modname]
- try:
- klass = getattr(themodule, name)
- except AttributeError, a:
- # print themodule, name, obj, type(obj)
- raise pickle.PicklingError("Can't pickle builtin %s" % obj)
- else:
- raise
+ if hasattr(themodule, name) and getattr(themodule, name) is obj:
+ return Pickler.save_global(self, obj, name)
- except (ImportError, KeyError, AttributeError):
- if typ == types.TypeType or typ == types.ClassType:
- sendRef = False
- else: #we can't deal with this
- raise
- else:
- if klass is not obj and (typ == types.TypeType or typ == types.ClassType):
- sendRef = False
- if not sendRef:
- #note: Third party types might crash this - add better checks!
- d = dict(obj.__dict__) #copy dict proxy to a dict
- if not isinstance(d.get('__dict__', None), property): # don't extract dict that are properties
- d.pop('__dict__',None)
- d.pop('__weakref__',None)
+ typ = type(obj)
+ if typ is not obj and isinstance(obj, (type, types.ClassType)):
+ d = dict(obj.__dict__) # copy dict proxy to a dict
+ if not isinstance(d.get('__dict__', None), property):
+ # don't extract dict that are properties
+ d.pop('__dict__', None)
+ d.pop('__weakref__', None)
# hack as __new__ is stored differently in the __dict__
new_override = d.get('__new__', None)
if new_override:
d['__new__'] = obj.__new__
- self.save_reduce(type(obj),(obj.__name__,obj.__bases__,
- d),obj=obj)
- #print 'internal reduce dask %s %s' % (obj, d)
- return
-
- if self.proto >= 2:
- code = _extension_registry.get((modname, name))
- if code:
- assert code > 0
- if code <= 0xff:
- write(pickle.EXT1 + chr(code))
- elif code <= 0xffff:
- write("%c%c%c" % (pickle.EXT2, code&0xff, code>>8))
- else:
- write(pickle.EXT4 + pack("<i", code))
- return
+ self.save_reduce(typ, (obj.__name__, obj.__bases__, d), obj=obj)
+ else:
+ raise pickle.PicklingError("Can't pickle %r" % obj)
- write(pickle.GLOBAL + modname + '\n' + name + '\n')
- self.memoize(obj)
+ dispatch[type] = save_global
dispatch[types.ClassType] = save_global
- dispatch[types.TypeType] = save_global
def save_instancemethod(self, obj):
- #Memoization rarely is ever useful due to python bounding
- self.save_reduce(types.MethodType, (obj.im_func, obj.im_self,obj.im_class), obj=obj)
+ # Memoization rarely is ever useful due to python bounding
+ if PY3:
+ self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
+ else:
+ self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),
+ obj=obj)
dispatch[types.MethodType] = save_instancemethod
- def save_inst_logic(self, obj):
+ def save_inst(self, obj):
"""Inner logic to save instance. Based off pickle.save_inst
Supports __transient__"""
cls = obj.__class__
- memo = self.memo
+ memo = self.memo
write = self.write
- save = self.save
+ save = self.save
if hasattr(obj, '__getinitargs__'):
args = obj.__getinitargs__()
- len(args) # XXX Assert it's a sequence
+ len(args) # XXX Assert it's a sequence
pickle._keep_alive(args, memo)
else:
args = ()
@@ -537,15 +413,8 @@ class CloudPickler(pickle.Pickler):
save(stuff)
write(pickle.BUILD)
-
- def save_inst(self, obj):
- # Hack to detect PIL Image instances without importing Imaging
- # PIL can be loaded with multiple names, so we don't check sys.modules for it
- if hasattr(obj,'im') and hasattr(obj,'palette') and 'Image' in obj.__module__:
- self.save_image(obj)
- else:
- self.save_inst_logic(obj)
- dispatch[types.InstanceType] = save_inst
+ if not PY3:
+ dispatch[types.InstanceType] = save_inst
def save_property(self, obj):
# properties not correctly saved in python
@@ -592,7 +461,7 @@ class CloudPickler(pickle.Pickler):
"""Modified to support __transient__ on new objects
Change only affects protocol level 2 (which is always used by PiCloud"""
# Assert that args is a tuple or None
- if not isinstance(args, types.TupleType):
+ if not isinstance(args, tuple):
raise pickle.PicklingError("args from reduce() should be a tuple")
# Assert that func is callable
@@ -646,35 +515,23 @@ class CloudPickler(pickle.Pickler):
self._batch_setitems(dictitems)
if state is not None:
- #print 'obj %s has state %s' % (obj, state)
save(state)
write(pickle.BUILD)
-
- def save_xrange(self, obj):
- """Save an xrange object in python 2.5
- Python 2.6 supports this natively
- """
- range_params = xrange_params(obj)
- self.save_reduce(_build_xrange,range_params)
-
- #python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it
- try:
- xrange(0).__reduce__()
- except TypeError: #can't pickle -- use PiCloud pickler
- dispatch[xrange] = save_xrange
-
def save_partial(self, obj):
"""Partial objects do not serialize correctly in python2.x -- this fixes the bugs"""
self.save_reduce(_genpartial, (obj.func, obj.args, obj.keywords))
- if sys.version_info < (2,7): #2.7 supports partial pickling
+ if sys.version_info < (2,7): # 2.7 supports partial pickling
dispatch[partial] = save_partial
def save_file(self, obj):
"""Save a file"""
- import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute
+ try:
+ import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute
+ except ImportError:
+ import io as pystringIO
if not hasattr(obj, 'name') or not hasattr(obj, 'mode'):
raise pickle.PicklingError("Cannot pickle files that do not map to an actual file")
@@ -720,10 +577,14 @@ class CloudPickler(pickle.Pickler):
retval.seek(curloc)
retval.name = name
- self.save(retval) #save stringIO
+ self.save(retval)
self.memoize(obj)
- dispatch[file] = save_file
+ if PY3:
+ dispatch[io.TextIOWrapper] = save_file
+ else:
+ dispatch[file] = save_file
+
"""Special functions for Add-on libraries"""
def inject_numpy(self):
@@ -732,76 +593,20 @@ class CloudPickler(pickle.Pickler):
return
self.dispatch[numpy.ufunc] = self.__class__.save_ufunc
- numpy_tst_mods = ['numpy', 'scipy.special']
def save_ufunc(self, obj):
"""Hack function for saving numpy ufunc objects"""
name = obj.__name__
- for tst_mod_name in self.numpy_tst_mods:
+ numpy_tst_mods = ['numpy', 'scipy.special']
+ for tst_mod_name in numpy_tst_mods:
tst_mod = sys.modules.get(tst_mod_name, None)
- if tst_mod:
- if name in tst_mod.__dict__:
- self.save_reduce(_getobject, (tst_mod_name, name))
- return
- raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in' % str(obj))
-
- def inject_timeseries(self):
- """Handle bugs with pickling scikits timeseries"""
- tseries = sys.modules.get('scikits.timeseries.tseries')
- if not tseries or not hasattr(tseries, 'Timeseries'):
- return
- self.dispatch[tseries.Timeseries] = self.__class__.save_timeseries
-
- def save_timeseries(self, obj):
- import scikits.timeseries.tseries as ts
-
- func, reduce_args, state = obj.__reduce__()
- if func != ts._tsreconstruct:
- raise pickle.PicklingError('timeseries using unexpected reconstruction function %s' % str(func))
- state = (1,
- obj.shape,
- obj.dtype,
- obj.flags.fnc,
- obj._data.tostring(),
- ts.getmaskarray(obj).tostring(),
- obj._fill_value,
- obj._dates.shape,
- obj._dates.__array__().tostring(),
- obj._dates.dtype, #added -- preserve type
- obj.freq,
- obj._optinfo,
- )
- return self.save_reduce(_genTimeSeries, (reduce_args, state))
-
- def inject_email(self):
- """Block email LazyImporters from being saved"""
- email = sys.modules.get('email')
- if not email:
- return
- self.dispatch[email.LazyImporter] = self.__class__.save_unsupported
+ if tst_mod and name in tst_mod.__dict__:
+ return self.save_reduce(_getobject, (tst_mod_name, name))
+ raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in'
+ % str(obj))
def inject_addons(self):
"""Plug in system. Register additional pickling functions if modules already loaded"""
self.inject_numpy()
- self.inject_timeseries()
- self.inject_email()
-
- """Python Imaging Library"""
- def save_image(self, obj):
- if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name \
- and not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()):
- #if image not loaded yet -- lazy load
- self.save_reduce(_lazyloadImage,(obj.fp,), obj=obj)
- else:
- #image is loaded - just transmit it over
- self.save_reduce(_generateImage, (obj.size, obj.mode, obj.tostring()), obj=obj)
-
- """
- def memoize(self, obj):
- pickle.Pickler.memoize(self, obj)
- if printMemoization:
- print 'memoizing ' + str(obj)
- """
-
# Shorthands for legacy support
@@ -809,14 +614,13 @@ class CloudPickler(pickle.Pickler):
def dump(obj, file, protocol=2):
CloudPickler(file, protocol).dump(obj)
+
def dumps(obj, protocol=2):
file = StringIO()
cp = CloudPickler(file,protocol)
cp.dump(obj)
- #print 'cloud dumped', str(obj), str(cp.modules)
-
return file.getvalue()
@@ -825,25 +629,6 @@ def subimport(name):
__import__(name)
return sys.modules[name]
-#hack to load django settings:
-def django_settings_load(name):
- modified_env = False
-
- if 'DJANGO_SETTINGS_MODULE' not in os.environ:
- os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps
- modified_env = True
- try:
- module = subimport(name)
- except Exception, i:
- print >> sys.stderr, 'Cloud not import django settings %s:' % (name)
- print_exec(sys.stderr)
- if modified_env:
- del os.environ['DJANGO_SETTINGS_MODULE']
- else:
- #add project directory to sys,path:
- if hasattr(module,'__file__'):
- dirname = os.path.split(module.__file__)[0] + '/'
- sys.path.append(dirname)
# restores function attributes
def _restore_attr(obj, attr):
@@ -851,13 +636,16 @@ def _restore_attr(obj, attr):
setattr(obj, key, val)
return obj
+
def _get_module_builtins():
return pickle.__builtins__
+
def print_exec(stream):
ei = sys.exc_info()
traceback.print_exception(ei[0], ei[1], ei[2], None, stream)
+
def _modules_to_main(modList):
"""Force every module in modList to be placed into main"""
if not modList:
@@ -868,22 +656,16 @@ def _modules_to_main(modList):
if type(modname) is str:
try:
mod = __import__(modname)
- except Exception, i: #catch all...
- sys.stderr.write('warning: could not import %s\n. Your function may unexpectedly error due to this import failing; \
-A version mismatch is likely. Specific error was:\n' % modname)
+ except Exception as e:
+ sys.stderr.write('warning: could not import %s\n. '
+ 'Your function may unexpectedly error due to this import failing;'
+ 'A version mismatch is likely. Specific error was:\n' % modname)
print_exec(sys.stderr)
else:
- setattr(main,mod.__name__, mod)
- else:
- #REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD)
- #In old version actual module was sent
- setattr(main,modname.__name__, modname)
+ setattr(main, mod.__name__, mod)
-#object generators:
-def _build_xrange(start, step, len):
- """Built xrange explicitly"""
- return xrange(start, start + step*len, step)
+#object generators:
def _genpartial(func, args, kwds):
if not args:
args = ()
@@ -891,22 +673,26 @@ def _genpartial(func, args, kwds):
kwds = {}
return partial(func, *args, **kwds)
+
def _fill_function(func, globals, defaults, dict):
""" Fills in the rest of function data into the skeleton function object
that were created via _make_skel_func().
"""
- func.func_globals.update(globals)
- func.func_defaults = defaults
- func.func_dict = dict
+ func.__globals__.update(globals)
+ func.__defaults__ = defaults
+ func.__dict__ = dict
return func
+
def _make_cell(value):
- return (lambda: value).func_closure[0]
+ return (lambda: value).__closure__[0]
+
def _reconstruct_closure(values):
return tuple([_make_cell(v) for v in values])
+
def _make_skel_func(code, closures, base_globals = None):
""" Creates a skeleton function object that contains just the provided
code and the correct number of cells in func_closure. All other
@@ -928,40 +714,3 @@ Note: These can never be renamed due to client compatibility issues"""
def _getobject(modname, attribute):
mod = __import__(modname, fromlist=[attribute])
return mod.__dict__[attribute]
-
-def _generateImage(size, mode, str_rep):
- """Generate image from string representation"""
- import Image
- i = Image.new(mode, size)
- i.fromstring(str_rep)
- return i
-
-def _lazyloadImage(fp):
- import Image
- fp.seek(0) #works in almost any case
- return Image.open(fp)
-
-"""Timeseries"""
-def _genTimeSeries(reduce_args, state):
- import scikits.timeseries.tseries as ts
- from numpy import ndarray
- from numpy.ma import MaskedArray
-
-
- time_series = ts._tsreconstruct(*reduce_args)
-
- #from setstate modified
- (ver, shp, typ, isf, raw, msk, flv, dsh, dtm, dtyp, frq, infodict) = state
- #print 'regenerating %s' % dtyp
-
- MaskedArray.__setstate__(time_series, (ver, shp, typ, isf, raw, msk, flv))
- _dates = time_series._dates
- #_dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ
- ndarray.__setstate__(_dates,(dsh,dtyp, isf, dtm))
- _dates.freq = frq
- _dates._cachedinfo.update(dict(full=None, hasdups=None, steps=None,
- toobj=None, toord=None, tostr=None))
- # Update the _optinfo dictionary
- time_series._optinfo.update(infodict)
- return time_series
-