diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-03-06 08:57:01 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-03-06 08:57:01 -0800 |
commit | ee913e6e2d58dfac20f3f06ff306081bd0e48066 (patch) | |
tree | 262f35891a14f8ae3cca03c4700c341fa3239bf6 /python/pyspark/cloudpickle.py | |
parent | 8ff88094daa4945e7d718baa7b20703fd8087ab0 (diff) | |
download | spark-ee913e6e2d58dfac20f3f06ff306081bd0e48066.tar.gz spark-ee913e6e2d58dfac20f3f06ff306081bd0e48066.tar.bz2 spark-ee913e6e2d58dfac20f3f06ff306081bd0e48066.zip |
[SPARK-13697] [PYSPARK] Fix the missing module name of TransformFunctionSerializer.loads
## What changes were proposed in this pull request?
Set the function's module name to `__main__` if it's missing in `TransformFunctionSerializer.loads`.
## How was this patch tested?
Manually test in the shell.
Before this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x106ac8b18>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
None
>>> print(func2.rdd_wrap_func.__module__)
None
>>>
```
After this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x108bf1b90>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
__main__
>>> print(func2.rdd_wrap_func.__module__)
__main__
>>>
```
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #11535 from zsxwing/loads-module.
Diffstat (limited to 'python/pyspark/cloudpickle.py')
-rw-r--r-- | python/pyspark/cloudpickle.py | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 95b3abc742..e56e22a9b9 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -241,6 +241,7 @@ class CloudPickler(Pickler): save(f_globals) save(defaults) save(dct) + save(func.__module__) write(pickle.TUPLE) write(pickle.REDUCE) # applies _fill_function on the tuple @@ -698,13 +699,14 @@ def _genpartial(func, args, kwds): return partial(func, *args, **kwds) -def _fill_function(func, globals, defaults, dict): +def _fill_function(func, globals, defaults, dict, module): """ Fills in the rest of function data into the skeleton function object that were created via _make_skel_func(). """ func.__globals__.update(globals) func.__defaults__ = defaults func.__dict__ = dict + func.__module__ = module return func |