aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-01-17 09:53:20 -0800
committerDavies Liu <davies.liu@gmail.com>2017-01-17 09:53:20 -0800
commit20e6280626fe243b170a2e7c5e018c67f3dac1db (patch)
tree57b4c3ff5ad14cc3c32c2d28f2ada9b53d1567ad /python
parentb79cc7ceb439b3d4e0009963ede3416e3241e562 (diff)
downloadspark-20e6280626fe243b170a2e7c5e018c67f3dac1db.tar.gz
spark-20e6280626fe243b170a2e7c5e018c67f3dac1db.tar.bz2
spark-20e6280626fe243b170a2e7c5e018c67f3dac1db.zip
[SPARK-19019] [PYTHON] Fix hijacked `collections.namedtuple` and port cloudpickle changes for PySpark to work with Python 3.6.0
## What changes were proposed in this pull request? Currently, PySpark does not work with Python 3.6.0. Running `./bin/pyspark` simply throws the error as below and PySpark does not work at all: ``` Traceback (most recent call last): File ".../spark/python/pyspark/shell.py", line 30, in <module> import pyspark File ".../spark/python/pyspark/__init__.py", line 46, in <module> from pyspark.context import SparkContext File ".../spark/python/pyspark/context.py", line 36, in <module> from pyspark.java_gateway import launch_gateway File ".../spark/python/pyspark/java_gateway.py", line 31, in <module> from py4j.java_gateway import java_import, JavaGateway, GatewayClient File "<frozen importlib._bootstrap>", line 961, in _find_and_load File "<frozen importlib._bootstrap>", line 950, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 646, in _load_unlocked File "<frozen importlib._bootstrap>", line 616, in _load_backward_compatible File ".../spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 18, in <module> File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pydoc.py", line 62, in <module> import pkgutil File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pkgutil.py", line 22, in <module> ModuleInfo = namedtuple('ModuleInfo', 'module_finder name ispkg') File ".../spark/python/pyspark/serializers.py", line 394, in namedtuple cls = _old_namedtuple(*args, **kwargs) TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', 'rename', and 'module' ``` The root cause seems because some arguments of `namedtuple` are now completely keyword-only arguments from Python 3.6.0 (See https://bugs.python.org/issue25628). We currently copy this function via `types.FunctionType` which does not set the default values of keyword-only arguments (meaning `namedtuple.__kwdefaults__`) and this seems causing internally missing values in the function (non-bound arguments). This PR proposes to work around this by manually setting it via `kwargs` as `types.FunctionType` seems not supporting to set this. Also, this PR ports the changes in cloudpickle for compatibility for Python 3.6.0. ## How was this patch tested? Manually tested with Python 2.7.6 and Python 3.6.0. ``` ./bin/pyspsark ``` , manual creation of `namedtuple` both in local and rdd with Python 3.6.0, and Jenkins tests for other Python versions. Also, ``` ./run-tests --python-executables=python3.6 ``` ``` Will test against the following Python executables: ['python3.6'] Will test the following Python modules: ['pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-sql', 'pyspark-streaming'] Finished test(python3.6): pyspark.sql.tests (192s) Finished test(python3.6): pyspark.accumulators (3s) Finished test(python3.6): pyspark.mllib.tests (198s) Finished test(python3.6): pyspark.broadcast (3s) Finished test(python3.6): pyspark.conf (2s) Finished test(python3.6): pyspark.context (14s) Finished test(python3.6): pyspark.ml.classification (21s) Finished test(python3.6): pyspark.ml.evaluation (11s) Finished test(python3.6): pyspark.ml.clustering (20s) Finished test(python3.6): pyspark.ml.linalg.__init__ (0s) Finished test(python3.6): pyspark.streaming.tests (240s) Finished test(python3.6): pyspark.tests (240s) Finished test(python3.6): pyspark.ml.recommendation (19s) Finished test(python3.6): pyspark.ml.feature (36s) Finished test(python3.6): pyspark.ml.regression (37s) Finished test(python3.6): pyspark.ml.tuning (28s) Finished test(python3.6): pyspark.mllib.classification (26s) Finished test(python3.6): pyspark.mllib.evaluation (18s) Finished test(python3.6): pyspark.mllib.clustering (44s) Finished test(python3.6): pyspark.mllib.linalg.__init__ (0s) Finished test(python3.6): pyspark.mllib.feature (26s) Finished test(python3.6): pyspark.mllib.fpm (23s) Finished test(python3.6): pyspark.mllib.random (8s) Finished test(python3.6): pyspark.ml.tests (92s) Finished test(python3.6): pyspark.mllib.stat.KernelDensity (0s) Finished test(python3.6): pyspark.mllib.linalg.distributed (25s) Finished test(python3.6): pyspark.mllib.stat._statistics (15s) Finished test(python3.6): pyspark.mllib.recommendation (24s) Finished test(python3.6): pyspark.mllib.regression (26s) Finished test(python3.6): pyspark.profiler (9s) Finished test(python3.6): pyspark.mllib.tree (16s) Finished test(python3.6): pyspark.shuffle (1s) Finished test(python3.6): pyspark.mllib.util (18s) Finished test(python3.6): pyspark.serializers (11s) Finished test(python3.6): pyspark.rdd (20s) Finished test(python3.6): pyspark.sql.conf (8s) Finished test(python3.6): pyspark.sql.catalog (17s) Finished test(python3.6): pyspark.sql.column (18s) Finished test(python3.6): pyspark.sql.context (18s) Finished test(python3.6): pyspark.sql.group (27s) Finished test(python3.6): pyspark.sql.dataframe (33s) Finished test(python3.6): pyspark.sql.functions (35s) Finished test(python3.6): pyspark.sql.types (6s) Finished test(python3.6): pyspark.sql.streaming (13s) Finished test(python3.6): pyspark.streaming.util (0s) Finished test(python3.6): pyspark.sql.session (16s) Finished test(python3.6): pyspark.sql.window (4s) Finished test(python3.6): pyspark.sql.readwriter (35s) Tests passed in 433 seconds ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16429 from HyukjinKwon/SPARK-19019.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/cloudpickle.py98
-rw-r--r--python/pyspark/serializers.py20
2 files changed, 87 insertions, 31 deletions
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index da2b2f3757..959fb8b357 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -43,6 +43,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import print_function
import operator
+import opcode
import os
import io
import pickle
@@ -53,6 +54,8 @@ from functools import partial
import itertools
import dis
import traceback
+import weakref
+
if sys.version < '3':
from pickle import Pickler
@@ -68,10 +71,10 @@ else:
PY3 = True
#relevant opcodes
-STORE_GLOBAL = dis.opname.index('STORE_GLOBAL')
-DELETE_GLOBAL = dis.opname.index('DELETE_GLOBAL')
-LOAD_GLOBAL = dis.opname.index('LOAD_GLOBAL')
-GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
+STORE_GLOBAL = opcode.opmap['STORE_GLOBAL']
+DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL']
+LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL']
+GLOBAL_OPS = (STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL)
HAVE_ARGUMENT = dis.HAVE_ARGUMENT
EXTENDED_ARG = dis.EXTENDED_ARG
@@ -90,6 +93,43 @@ def _builtin_type(name):
return getattr(types, name)
+if sys.version_info < (3, 4):
+ def _walk_global_ops(code):
+ """
+ Yield (opcode, argument number) tuples for all
+ global-referencing instructions in *code*.
+ """
+ code = getattr(code, 'co_code', b'')
+ if not PY3:
+ code = map(ord, code)
+
+ n = len(code)
+ i = 0
+ extended_arg = 0
+ while i < n:
+ op = code[i]
+ i += 1
+ if op >= HAVE_ARGUMENT:
+ oparg = code[i] + code[i + 1] * 256 + extended_arg
+ extended_arg = 0
+ i += 2
+ if op == EXTENDED_ARG:
+ extended_arg = oparg * 65536
+ if op in GLOBAL_OPS:
+ yield op, oparg
+
+else:
+ def _walk_global_ops(code):
+ """
+ Yield (opcode, argument number) tuples for all
+ global-referencing instructions in *code*.
+ """
+ for instr in dis.get_instructions(code):
+ op = instr.opcode
+ if op in GLOBAL_OPS:
+ yield op, instr.arg
+
+
class CloudPickler(Pickler):
dispatch = Pickler.dispatch.copy()
@@ -260,38 +300,34 @@ class CloudPickler(Pickler):
write(pickle.TUPLE)
write(pickle.REDUCE) # applies _fill_function on the tuple
- @staticmethod
- def extract_code_globals(co):
+ _extract_code_globals_cache = (
+ weakref.WeakKeyDictionary()
+ if sys.version_info >= (2, 7) and not hasattr(sys, "pypy_version_info")
+ else {})
+
+ @classmethod
+ def extract_code_globals(cls, co):
"""
Find all globals names read or written to by codeblock co
"""
- code = co.co_code
- if not PY3:
- code = [ord(c) for c in code]
- names = co.co_names
- out_names = set()
-
- n = len(code)
- i = 0
- extended_arg = 0
- while i < n:
- op = code[i]
+ out_names = cls._extract_code_globals_cache.get(co)
+ if out_names is None:
+ try:
+ names = co.co_names
+ except AttributeError:
+ # PyPy "builtin-code" object
+ out_names = set()
+ else:
+ out_names = set(names[oparg]
+ for op, oparg in _walk_global_ops(co))
- i += 1
- if op >= HAVE_ARGUMENT:
- oparg = code[i] + code[i+1] * 256 + extended_arg
- extended_arg = 0
- i += 2
- if op == EXTENDED_ARG:
- extended_arg = oparg*65536
- if op in GLOBAL_OPS:
- out_names.add(names[oparg])
+ # see if nested function have any global refs
+ if co.co_consts:
+ for const in co.co_consts:
+ if type(const) is types.CodeType:
+ out_names |= cls.extract_code_globals(const)
- # see if nested function have any global refs
- if co.co_consts:
- for const in co.co_consts:
- if type(const) is types.CodeType:
- out_names |= CloudPickler.extract_code_globals(const)
+ cls._extract_code_globals_cache[co] = out_names
return out_names
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index c4f2f08cb4..ea5e00e9ee 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -382,18 +382,38 @@ def _hijack_namedtuple():
return
global _old_namedtuple # or it will put in closure
+ global _old_namedtuple_kwdefaults # or it will put in closure too
def _copy_func(f):
return types.FunctionType(f.__code__, f.__globals__, f.__name__,
f.__defaults__, f.__closure__)
+ def _kwdefaults(f):
+ # __kwdefaults__ contains the default values of keyword-only arguments which are
+ # introduced from Python 3. The possible cases for __kwdefaults__ in namedtuple
+ # are as below:
+ #
+ # - Does not exist in Python 2.
+ # - Returns None in <= Python 3.5.x.
+ # - Returns a dictionary containing the default values to the keys from Python 3.6.x
+ # (See https://bugs.python.org/issue25628).
+ kargs = getattr(f, "__kwdefaults__", None)
+ if kargs is None:
+ return {}
+ else:
+ return kargs
+
_old_namedtuple = _copy_func(collections.namedtuple)
+ _old_namedtuple_kwdefaults = _kwdefaults(collections.namedtuple)
def namedtuple(*args, **kwargs):
+ for k, v in _old_namedtuple_kwdefaults.items():
+ kwargs[k] = kwargs.get(k, v)
cls = _old_namedtuple(*args, **kwargs)
return _hack_namedtuple(cls)
# replace namedtuple with new one
+ collections.namedtuple.__globals__["_old_namedtuple_kwdefaults"] = _old_namedtuple_kwdefaults
collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple
collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple
collections.namedtuple.__code__ = namedtuple.__code__