From 04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 16 Apr 2015 16:20:57 -0700 Subject: [SPARK-4897] [PySpark] Python 3 support This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu Author: twneale Author: Josh Rosen Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py. --- python/pyspark/accumulators.py | 9 +- python/pyspark/broadcast.py | 37 +- python/pyspark/cloudpickle.py | 577 +++------ python/pyspark/conf.py | 9 +- python/pyspark/context.py | 42 +- python/pyspark/daemon.py | 36 +- python/pyspark/heapq3.py | 24 +- python/pyspark/java_gateway.py | 2 +- python/pyspark/join.py | 1 + python/pyspark/ml/classification.py | 4 +- python/pyspark/ml/feature.py | 22 +- python/pyspark/ml/param/__init__.py | 8 +- python/pyspark/ml/param/_shared_params_code_gen.py | 10 +- python/pyspark/mllib/__init__.py | 11 +- python/pyspark/mllib/classification.py | 7 +- python/pyspark/mllib/clustering.py | 18 +- python/pyspark/mllib/common.py | 19 +- python/pyspark/mllib/feature.py | 18 +- python/pyspark/mllib/fpm.py | 2 + python/pyspark/mllib/linalg.py | 48 +- python/pyspark/mllib/rand.py | 33 +- python/pyspark/mllib/recommendation.py | 7 +- python/pyspark/mllib/stat/_statistics.py | 25 +- python/pyspark/mllib/tests.py | 20 +- python/pyspark/mllib/tree.py | 15 +- python/pyspark/mllib/util.py | 26 +- python/pyspark/profiler.py | 10 +- python/pyspark/rdd.py | 189 +-- python/pyspark/rddsampler.py | 4 +- python/pyspark/serializers.py | 101 +- python/pyspark/shell.py | 16 +- python/pyspark/shuffle.py | 126 +- python/pyspark/sql/__init__.py | 15 +- python/pyspark/sql/_types.py | 1261 ++++++++++++++++++++ python/pyspark/sql/context.py | 32 +- python/pyspark/sql/dataframe.py | 63 +- python/pyspark/sql/functions.py | 6 +- python/pyspark/sql/tests.py | 11 +- python/pyspark/sql/types.py | 1252 ------------------- python/pyspark/statcounter.py | 4 +- python/pyspark/streaming/context.py | 5 +- python/pyspark/streaming/dstream.py | 51 +- python/pyspark/streaming/kafka.py | 8 +- python/pyspark/streaming/tests.py | 39 +- python/pyspark/streaming/util.py | 6 +- python/pyspark/tests.py | 327 ++--- python/pyspark/worker.py | 16 +- 47 files changed, 2250 insertions(+), 2322 deletions(-) create mode 100644 python/pyspark/sql/_types.py delete mode 100644 python/pyspark/sql/types.py (limited to 'python/pyspark') diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index ccbca67656..7271809e43 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -54,7 +54,7 @@ ... def zero(self, value): ... return [0.0] * len(value) ... def addInPlace(self, val1, val2): -... for i in xrange(len(val1)): +... for i in range(len(val1)): ... val1[i] += val2[i] ... return val1 >>> va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam()) @@ -86,9 +86,13 @@ Traceback (most recent call last): Exception:... """ +import sys import select import struct -import SocketServer +if sys.version < '3': + import SocketServer +else: + import socketserver as SocketServer import threading from pyspark.cloudpickle import CloudPickler from pyspark.serializers import read_int, PickleSerializer @@ -247,6 +251,7 @@ class AccumulatorServer(SocketServer.TCPServer): def shutdown(self): self.server_shutdown = True SocketServer.TCPServer.shutdown(self) + self.server_close() def _start_update_server(): diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 6b8a8b256a..3de4615428 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -16,10 +16,15 @@ # import os -import cPickle +import sys import gc from tempfile import NamedTemporaryFile +if sys.version < '3': + import cPickle as pickle +else: + import pickle + unicode = str __all__ = ['Broadcast'] @@ -70,33 +75,19 @@ class Broadcast(object): self._path = path def dump(self, value, f): - if isinstance(value, basestring): - if isinstance(value, unicode): - f.write('U') - value = value.encode('utf8') - else: - f.write('S') - f.write(value) - else: - f.write('P') - cPickle.dump(value, f, 2) + pickle.dump(value, f, 2) f.close() return f.name def load(self, path): with open(path, 'rb', 1 << 20) as f: - flag = f.read(1) - data = f.read() - if flag == 'P': - # cPickle.loads() may create lots of objects, disable GC - # temporary for better performance - gc.disable() - try: - return cPickle.loads(data) - finally: - gc.enable() - else: - return data.decode('utf8') if flag == 'U' else data + # pickle.load() may create lots of objects, disable GC + # temporary for better performance + gc.disable() + try: + return pickle.load(f) + finally: + gc.enable() @property def value(self): diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index bb0783555a..9ef93071d2 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -40,164 +40,126 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ - +from __future__ import print_function import operator import os +import io import pickle import struct import sys import types from functools import partial import itertools -from copy_reg import _extension_registry, _inverted_registry, _extension_cache -import new import dis import traceback -import platform - -PyImp = platform.python_implementation() - -import logging -cloudLog = logging.getLogger("Cloud.Transport") +if sys.version < '3': + from pickle import Pickler + try: + from cStringIO import StringIO + except ImportError: + from StringIO import StringIO + PY3 = False +else: + types.ClassType = type + from pickle import _Pickler as Pickler + from io import BytesIO as StringIO + PY3 = True #relevant opcodes -STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL')) -DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL')) -LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL')) +STORE_GLOBAL = dis.opname.index('STORE_GLOBAL') +DELETE_GLOBAL = dis.opname.index('DELETE_GLOBAL') +LOAD_GLOBAL = dis.opname.index('LOAD_GLOBAL') GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL] +HAVE_ARGUMENT = dis.HAVE_ARGUMENT +EXTENDED_ARG = dis.EXTENDED_ARG -HAVE_ARGUMENT = chr(dis.HAVE_ARGUMENT) -EXTENDED_ARG = chr(dis.EXTENDED_ARG) - -if PyImp == "PyPy": - # register builtin type in `new` - new.method = types.MethodType - -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO -# These helper functions were copied from PiCloud's util module. def islambda(func): - return getattr(func,'func_name') == '' + return getattr(func,'__name__') == '' -def xrange_params(xrangeobj): - """Returns a 3 element tuple describing the xrange start, step, and len - respectively - Note: Only guarentees that elements of xrange are the same. parameters may - be different. - e.g. xrange(1,1) is interpretted as xrange(0,0); both behave the same - though w/ iteration - """ - - xrange_len = len(xrangeobj) - if not xrange_len: #empty - return (0,1,0) - start = xrangeobj[0] - if xrange_len == 1: #one element - return start, 1, 1 - return (start, xrangeobj[1] - xrangeobj[0], xrange_len) - -#debug variables intended for developer use: -printSerialization = False -printMemoization = False +_BUILTIN_TYPE_NAMES = {} +for k, v in types.__dict__.items(): + if type(v) is type: + _BUILTIN_TYPE_NAMES[v] = k -useForcedImports = True #Should I use forced imports for tracking? +def _builtin_type(name): + return getattr(types, name) -class CloudPickler(pickle.Pickler): +class CloudPickler(Pickler): - dispatch = pickle.Pickler.dispatch.copy() - savedForceImports = False - savedDjangoEnv = False #hack tro transport django environment + dispatch = Pickler.dispatch.copy() - def __init__(self, file, protocol=None, min_size_to_save= 0): - pickle.Pickler.__init__(self,file,protocol) - self.modules = set() #set of modules needed to depickle - self.globals_ref = {} # map ids to dictionary. used to ensure that functions can share global env + def __init__(self, file, protocol=None): + Pickler.__init__(self, file, protocol) + # set of modules to unpickle + self.modules = set() + # map ids to dictionary. used to ensure that functions can share global env + self.globals_ref = {} def dump(self, obj): - # note: not thread safe - # minimal side-effects, so not fixing - recurse_limit = 3000 - base_recurse = sys.getrecursionlimit() - if base_recurse < recurse_limit: - sys.setrecursionlimit(recurse_limit) self.inject_addons() try: - return pickle.Pickler.dump(self, obj) - except RuntimeError, e: + return Pickler.dump(self, obj) + except RuntimeError as e: if 'recursion' in e.args[0]: - msg = """Could not pickle object as excessively deep recursion required. - Try _fast_serialization=2 or contact PiCloud support""" + msg = """Could not pickle object as excessively deep recursion required.""" raise pickle.PicklingError(msg) - finally: - new_recurse = sys.getrecursionlimit() - if new_recurse == recurse_limit: - sys.setrecursionlimit(base_recurse) + + def save_memoryview(self, obj): + """Fallback to save_string""" + Pickler.save_string(self, str(obj)) def save_buffer(self, obj): """Fallback to save_string""" - pickle.Pickler.save_string(self,str(obj)) - dispatch[buffer] = save_buffer + Pickler.save_string(self,str(obj)) + if PY3: + dispatch[memoryview] = save_memoryview + else: + dispatch[buffer] = save_buffer - #block broken objects - def save_unsupported(self, obj, pack=None): + def save_unsupported(self, obj): raise pickle.PicklingError("Cannot pickle objects of type %s" % type(obj)) dispatch[types.GeneratorType] = save_unsupported - #python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it - try: - slice(0,1).__reduce__() - except TypeError: #can't pickle - - dispatch[slice] = save_unsupported - - #itertools objects do not pickle! + # itertools objects do not pickle! for v in itertools.__dict__.values(): if type(v) is type: dispatch[v] = save_unsupported - - def save_dict(self, obj): - """hack fix - If the dict is a global, deal with it in a special way - """ - #print 'saving', obj - if obj is __builtins__: - self.save_reduce(_get_module_builtins, (), obj=obj) - else: - pickle.Pickler.save_dict(self, obj) - dispatch[pickle.DictionaryType] = save_dict - - - def save_module(self, obj, pack=struct.pack): + def save_module(self, obj): """ Save a module as an import """ - #print 'try save import', obj.__name__ self.modules.add(obj) - self.save_reduce(subimport,(obj.__name__,), obj=obj) - dispatch[types.ModuleType] = save_module #new type + self.save_reduce(subimport, (obj.__name__,), obj=obj) + dispatch[types.ModuleType] = save_module - def save_codeobject(self, obj, pack=struct.pack): + def save_codeobject(self, obj): """ Save a code object """ - #print 'try to save codeobj: ', obj - args = ( - obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code, - obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name, - obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars - ) + if PY3: + args = ( + obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, + obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames, + obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, + obj.co_cellvars + ) + else: + args = ( + obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code, + obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name, + obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars + ) self.save_reduce(types.CodeType, args, obj=obj) - dispatch[types.CodeType] = save_codeobject #new type + dispatch[types.CodeType] = save_codeobject - def save_function(self, obj, name=None, pack=struct.pack): + def save_function(self, obj, name=None): """ Registered with the dispatch to handle all function types. Determines what kind of function obj is (e.g. lambda, defined at @@ -205,12 +167,14 @@ class CloudPickler(pickle.Pickler): """ write = self.write - name = obj.__name__ + if name is None: + name = obj.__name__ modname = pickle.whichmodule(obj, name) - #print 'which gives %s %s %s' % (modname, obj, name) + # print('which gives %s %s %s' % (modname, obj, name)) try: themodule = sys.modules[modname] - except KeyError: # eval'd items such as namedtuple give invalid items for their function __module__ + except KeyError: + # eval'd items such as namedtuple give invalid items for their function __module__ modname = '__main__' if modname == '__main__': @@ -221,37 +185,18 @@ class CloudPickler(pickle.Pickler): if getattr(themodule, name, None) is obj: return self.save_global(obj, name) - if not self.savedDjangoEnv: - #hack for django - if we detect the settings module, we transport it - django_settings = os.environ.get('DJANGO_SETTINGS_MODULE', '') - if django_settings: - django_mod = sys.modules.get(django_settings) - if django_mod: - cloudLog.debug('Transporting django settings %s during save of %s', django_mod, name) - self.savedDjangoEnv = True - self.modules.add(django_mod) - write(pickle.MARK) - self.save_reduce(django_settings_load, (django_mod.__name__,), obj=django_mod) - write(pickle.POP_MARK) - - # if func is lambda, def'ed at prompt, is in main, or is nested, then # we'll pickle the actual function object rather than simply saving a # reference (as is done in default pickler), via save_function_tuple. - if islambda(obj) or obj.func_code.co_filename == '' or themodule is None: - #Force server to import modules that have been imported in main - modList = None - if themodule is None and not self.savedForceImports: - mainmod = sys.modules['__main__'] - if useForcedImports and hasattr(mainmod,'___pyc_forcedImports__'): - modList = list(mainmod.___pyc_forcedImports__) - self.savedForceImports = True - self.save_function_tuple(obj, modList) + if islambda(obj) or obj.__code__.co_filename == '' or themodule is None: + #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule) + self.save_function_tuple(obj) return - else: # func is nested + else: + # func is nested klass = getattr(themodule, name, None) if klass is None or klass is not obj: - self.save_function_tuple(obj, [themodule]) + self.save_function_tuple(obj) return if obj.__dict__: @@ -266,7 +211,7 @@ class CloudPickler(pickle.Pickler): self.memoize(obj) dispatch[types.FunctionType] = save_function - def save_function_tuple(self, func, forced_imports): + def save_function_tuple(self, func): """ Pickles an actual func object. A func comprises: code, globals, defaults, closure, and dict. We @@ -281,19 +226,6 @@ class CloudPickler(pickle.Pickler): save = self.save write = self.write - # save the modules (if any) - if forced_imports: - write(pickle.MARK) - save(_modules_to_main) - #print 'forced imports are', forced_imports - - forced_names = map(lambda m: m.__name__, forced_imports) - save((forced_names,)) - - #save((forced_imports,)) - write(pickle.REDUCE) - write(pickle.POP_MARK) - code, f_globals, defaults, closure, dct, base_globals = self.extract_func_data(func) save(_fill_function) # skeleton function updater @@ -318,6 +250,8 @@ class CloudPickler(pickle.Pickler): Find all globals names read or written to by codeblock co """ code = co.co_code + if not PY3: + code = [ord(c) for c in code] names = co.co_names out_names = set() @@ -327,18 +261,18 @@ class CloudPickler(pickle.Pickler): while i < n: op = code[i] - i = i+1 + i += 1 if op >= HAVE_ARGUMENT: - oparg = ord(code[i]) + ord(code[i+1])*256 + extended_arg + oparg = code[i] + code[i+1] * 256 + extended_arg extended_arg = 0 - i = i+2 + i += 2 if op == EXTENDED_ARG: - extended_arg = oparg*65536L + extended_arg = oparg*65536 if op in GLOBAL_OPS: out_names.add(names[oparg]) - #print 'extracted', out_names, ' from ', names - if co.co_consts: # see if nested function have any global refs + # see if nested function have any global refs + if co.co_consts: for const in co.co_consts: if type(const) is types.CodeType: out_names |= CloudPickler.extract_code_globals(const) @@ -350,46 +284,28 @@ class CloudPickler(pickle.Pickler): Turn the function into a tuple of data necessary to recreate it: code, globals, defaults, closure, dict """ - code = func.func_code + code = func.__code__ # extract all global ref's - func_global_refs = CloudPickler.extract_code_globals(code) + func_global_refs = self.extract_code_globals(code) # process all variables referenced by global environment f_globals = {} for var in func_global_refs: - #Some names, such as class functions are not global - we don't need them - if func.func_globals.has_key(var): - f_globals[var] = func.func_globals[var] + if var in func.__globals__: + f_globals[var] = func.__globals__[var] # defaults requires no processing - defaults = func.func_defaults - - def get_contents(cell): - try: - return cell.cell_contents - except ValueError, e: #cell is empty error on not yet assigned - raise pickle.PicklingError('Function to be pickled has free variables that are referenced before assignment in enclosing scope') - + defaults = func.__defaults__ # process closure - if func.func_closure: - closure = map(get_contents, func.func_closure) - else: - closure = [] + closure = [c.cell_contents for c in func.__closure__] if func.__closure__ else [] # save the dict - dct = func.func_dict - - if printSerialization: - outvars = ['code: ' + str(code) ] - outvars.append('globals: ' + str(f_globals)) - outvars.append('defaults: ' + str(defaults)) - outvars.append('closure: ' + str(closure)) - print 'function ', func, 'is extracted to: ', ', '.join(outvars) + dct = func.__dict__ - base_globals = self.globals_ref.get(id(func.func_globals), {}) - self.globals_ref[id(func.func_globals)] = base_globals + base_globals = self.globals_ref.get(id(func.__globals__), {}) + self.globals_ref[id(func.__globals__)] = base_globals return (code, f_globals, defaults, closure, dct, base_globals) @@ -400,8 +316,9 @@ class CloudPickler(pickle.Pickler): dispatch[types.BuiltinFunctionType] = save_builtin_function def save_global(self, obj, name=None, pack=struct.pack): - write = self.write - memo = self.memo + if obj.__module__ == "__builtin__" or obj.__module__ == "builtins": + if obj in _BUILTIN_TYPE_NAMES: + return self.save_reduce(_builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj) if name is None: name = obj.__name__ @@ -410,98 +327,57 @@ class CloudPickler(pickle.Pickler): if modname is None: modname = pickle.whichmodule(obj, name) - try: - __import__(modname) - themodule = sys.modules[modname] - except (ImportError, KeyError, AttributeError): #should never occur - raise pickle.PicklingError( - "Can't pickle %r: Module %s cannot be found" % - (obj, modname)) - if modname == '__main__': themodule = None - - if themodule: + else: + __import__(modname) + themodule = sys.modules[modname] self.modules.add(themodule) - sendRef = True - typ = type(obj) - #print 'saving', obj, typ - try: - try: #Deal with case when getattribute fails with exceptions - klass = getattr(themodule, name) - except (AttributeError): - if modname == '__builtin__': #new.* are misrepeported - modname = 'new' - __import__(modname) - themodule = sys.modules[modname] - try: - klass = getattr(themodule, name) - except AttributeError, a: - # print themodule, name, obj, type(obj) - raise pickle.PicklingError("Can't pickle builtin %s" % obj) - else: - raise + if hasattr(themodule, name) and getattr(themodule, name) is obj: + return Pickler.save_global(self, obj, name) - except (ImportError, KeyError, AttributeError): - if typ == types.TypeType or typ == types.ClassType: - sendRef = False - else: #we can't deal with this - raise - else: - if klass is not obj and (typ == types.TypeType or typ == types.ClassType): - sendRef = False - if not sendRef: - #note: Third party types might crash this - add better checks! - d = dict(obj.__dict__) #copy dict proxy to a dict - if not isinstance(d.get('__dict__', None), property): # don't extract dict that are properties - d.pop('__dict__',None) - d.pop('__weakref__',None) + typ = type(obj) + if typ is not obj and isinstance(obj, (type, types.ClassType)): + d = dict(obj.__dict__) # copy dict proxy to a dict + if not isinstance(d.get('__dict__', None), property): + # don't extract dict that are properties + d.pop('__dict__', None) + d.pop('__weakref__', None) # hack as __new__ is stored differently in the __dict__ new_override = d.get('__new__', None) if new_override: d['__new__'] = obj.__new__ - self.save_reduce(type(obj),(obj.__name__,obj.__bases__, - d),obj=obj) - #print 'internal reduce dask %s %s' % (obj, d) - return - - if self.proto >= 2: - code = _extension_registry.get((modname, name)) - if code: - assert code > 0 - if code <= 0xff: - write(pickle.EXT1 + chr(code)) - elif code <= 0xffff: - write("%c%c%c" % (pickle.EXT2, code&0xff, code>>8)) - else: - write(pickle.EXT4 + pack("> sys.stderr, 'Cloud not import django settings %s:' % (name) - print_exec(sys.stderr) - if modified_env: - del os.environ['DJANGO_SETTINGS_MODULE'] - else: - #add project directory to sys,path: - if hasattr(module,'__file__'): - dirname = os.path.split(module.__file__)[0] + '/' - sys.path.append(dirname) # restores function attributes def _restore_attr(obj, attr): @@ -851,13 +636,16 @@ def _restore_attr(obj, attr): setattr(obj, key, val) return obj + def _get_module_builtins(): return pickle.__builtins__ + def print_exec(stream): ei = sys.exc_info() traceback.print_exception(ei[0], ei[1], ei[2], None, stream) + def _modules_to_main(modList): """Force every module in modList to be placed into main""" if not modList: @@ -868,22 +656,16 @@ def _modules_to_main(modList): if type(modname) is str: try: mod = __import__(modname) - except Exception, i: #catch all... - sys.stderr.write('warning: could not import %s\n. Your function may unexpectedly error due to this import failing; \ -A version mismatch is likely. Specific error was:\n' % modname) + except Exception as e: + sys.stderr.write('warning: could not import %s\n. ' + 'Your function may unexpectedly error due to this import failing;' + 'A version mismatch is likely. Specific error was:\n' % modname) print_exec(sys.stderr) else: - setattr(main,mod.__name__, mod) - else: - #REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD) - #In old version actual module was sent - setattr(main,modname.__name__, modname) + setattr(main, mod.__name__, mod) -#object generators: -def _build_xrange(start, step, len): - """Built xrange explicitly""" - return xrange(start, start + step*len, step) +#object generators: def _genpartial(func, args, kwds): if not args: args = () @@ -891,22 +673,26 @@ def _genpartial(func, args, kwds): kwds = {} return partial(func, *args, **kwds) + def _fill_function(func, globals, defaults, dict): """ Fills in the rest of function data into the skeleton function object that were created via _make_skel_func(). """ - func.func_globals.update(globals) - func.func_defaults = defaults - func.func_dict = dict + func.__globals__.update(globals) + func.__defaults__ = defaults + func.__dict__ = dict return func + def _make_cell(value): - return (lambda: value).func_closure[0] + return (lambda: value).__closure__[0] + def _reconstruct_closure(values): return tuple([_make_cell(v) for v in values]) + def _make_skel_func(code, closures, base_globals = None): """ Creates a skeleton function object that contains just the provided code and the correct number of cells in func_closure. All other @@ -928,40 +714,3 @@ Note: These can never be renamed due to client compatibility issues""" def _getobject(modname, attribute): mod = __import__(modname, fromlist=[attribute]) return mod.__dict__[attribute] - -def _generateImage(size, mode, str_rep): - """Generate image from string representation""" - import Image - i = Image.new(mode, size) - i.fromstring(str_rep) - return i - -def _lazyloadImage(fp): - import Image - fp.seek(0) #works in almost any case - return Image.open(fp) - -"""Timeseries""" -def _genTimeSeries(reduce_args, state): - import scikits.timeseries.tseries as ts - from numpy import ndarray - from numpy.ma import MaskedArray - - - time_series = ts._tsreconstruct(*reduce_args) - - #from setstate modified - (ver, shp, typ, isf, raw, msk, flv, dsh, dtm, dtyp, frq, infodict) = state - #print 'regenerating %s' % dtyp - - MaskedArray.__setstate__(time_series, (ver, shp, typ, isf, raw, msk, flv)) - _dates = time_series._dates - #_dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ - ndarray.__setstate__(_dates,(dsh,dtyp, isf, dtm)) - _dates.freq = frq - _dates._cachedinfo.update(dict(full=None, hasdups=None, steps=None, - toobj=None, toord=None, tostr=None)) - # Update the _optinfo dictionary - time_series._optinfo.update(infodict) - return time_series - diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index dc7cd0bce5..924da3eecf 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -44,7 +44,7 @@ u'/path' >>> conf.get("spark.executorEnv.VAR1") u'value1' ->>> print conf.toDebugString() +>>> print(conf.toDebugString()) spark.executorEnv.VAR1=value1 spark.executorEnv.VAR3=value3 spark.executorEnv.VAR4=value4 @@ -56,6 +56,13 @@ spark.home=/path __all__ = ['SparkConf'] +import sys +import re + +if sys.version > '3': + unicode = str + __doc__ = re.sub(r"(\W|^)[uU](['])", r'\1\2', __doc__) + class SparkConf(object): diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 78dccc4047..1dc2fec0ae 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import print_function + import os import shutil import sys @@ -32,11 +34,14 @@ from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ PairDeserializer, AutoBatchedSerializer, NoOpSerializer from pyspark.storagelevel import StorageLevel -from pyspark.rdd import RDD, _load_from_socket +from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix from pyspark.traceback_utils import CallSite, first_spark_call from pyspark.status import StatusTracker from pyspark.profiler import ProfilerCollector, BasicProfiler +if sys.version > '3': + xrange = range + __all__ = ['SparkContext'] @@ -133,7 +138,7 @@ class SparkContext(object): if sparkHome: self._conf.setSparkHome(sparkHome) if environment: - for key, value in environment.iteritems(): + for key, value in environment.items(): self._conf.setExecutorEnv(key, value) for key, value in DEFAULT_CONFIGS.items(): self._conf.setIfMissing(key, value) @@ -153,6 +158,10 @@ class SparkContext(object): if k.startswith("spark.executorEnv."): varName = k[len("spark.executorEnv."):] self.environment[varName] = v + if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ: + # disable randomness of hash of string in worker, if this is not + # launched by spark-submit + self.environment["PYTHONHASHSEED"] = "0" # Create the Java SparkContext through Py4J self._jsc = jsc or self._initialize_context(self._conf._jconf) @@ -323,7 +332,7 @@ class SparkContext(object): start0 = c[0] def getStart(split): - return start0 + (split * size / numSlices) * step + return start0 + int((split * size / numSlices)) * step def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) @@ -357,6 +366,7 @@ class SparkContext(object): minPartitions = minPartitions or self.defaultMinPartitions return RDD(self._jsc.objectFile(name, minPartitions), self) + @ignore_unicode_prefix def textFile(self, name, minPartitions=None, use_unicode=True): """ Read a text file from HDFS, a local file system (available on all @@ -369,7 +379,7 @@ class SparkContext(object): >>> path = os.path.join(tempdir, "sample-text.txt") >>> with open(path, "w") as testFile: - ... testFile.write("Hello world!") + ... _ = testFile.write("Hello world!") >>> textFile = sc.textFile(path) >>> textFile.collect() [u'Hello world!'] @@ -378,6 +388,7 @@ class SparkContext(object): return RDD(self._jsc.textFile(name, minPartitions), self, UTF8Deserializer(use_unicode)) + @ignore_unicode_prefix def wholeTextFiles(self, path, minPartitions=None, use_unicode=True): """ Read a directory of text files from HDFS, a local file system @@ -411,9 +422,9 @@ class SparkContext(object): >>> dirPath = os.path.join(tempdir, "files") >>> os.mkdir(dirPath) >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1: - ... file1.write("1") + ... _ = file1.write("1") >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2: - ... file2.write("2") + ... _ = file2.write("2") >>> textFiles = sc.wholeTextFiles(dirPath) >>> sorted(textFiles.collect()) [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')] @@ -456,7 +467,7 @@ class SparkContext(object): jm = self._jvm.java.util.HashMap() if not d: d = {} - for k, v in d.iteritems(): + for k, v in d.items(): jm[k] = v return jm @@ -608,6 +619,7 @@ class SparkContext(object): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer) + @ignore_unicode_prefix def union(self, rdds): """ Build the union of a list of RDDs. @@ -618,7 +630,7 @@ class SparkContext(object): >>> path = os.path.join(tempdir, "union-text.txt") >>> with open(path, "w") as testFile: - ... testFile.write("Hello") + ... _ = testFile.write("Hello") >>> textFile = sc.textFile(path) >>> textFile.collect() [u'Hello'] @@ -677,7 +689,7 @@ class SparkContext(object): >>> from pyspark import SparkFiles >>> path = os.path.join(tempdir, "test.txt") >>> with open(path, "w") as testFile: - ... testFile.write("100") + ... _ = testFile.write("100") >>> sc.addFile(path) >>> def func(iterator): ... with open(SparkFiles.get("test.txt")) as testFile: @@ -705,11 +717,13 @@ class SparkContext(object): """ self.addFile(path) (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix - if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: self._python_includes.append(filename) # for tests in local mode sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + if sys.version > '3': + import importlib + importlib.invalidate_caches() def setCheckpointDir(self, dirName): """ @@ -744,7 +758,7 @@ class SparkContext(object): The application can use L{SparkContext.cancelJobGroup} to cancel all running jobs in this group. - >>> import thread, threading + >>> import threading >>> from time import sleep >>> result = "Not Set" >>> lock = threading.Lock() @@ -763,10 +777,10 @@ class SparkContext(object): ... sleep(5) ... sc.cancelJobGroup("job_to_cancel") >>> supress = lock.acquire() - >>> supress = thread.start_new_thread(start_job, (10,)) - >>> supress = thread.start_new_thread(stop_job, tuple()) + >>> supress = threading.Thread(target=start_job, args=(10,)).start() + >>> supress = threading.Thread(target=stop_job).start() >>> supress = lock.acquire() - >>> print result + >>> print(result) Cancelled If interruptOnCancel is set to true for the job group, then job cancellation will result diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 93885985fe..7f06d4288c 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -24,9 +24,10 @@ import sys import traceback import time import gc -from errno import EINTR, ECHILD, EAGAIN +from errno import EINTR, EAGAIN from socket import AF_INET, SOCK_STREAM, SOMAXCONN from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT + from pyspark.worker import main as worker_main from pyspark.serializers import read_int, write_int @@ -53,8 +54,8 @@ def worker(sock): # Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because # otherwise writes also cause a seek that makes us miss data on the read side. - infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) - outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) + infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536) + outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536) exit_code = 0 try: worker_main(infile, outfile) @@ -68,17 +69,6 @@ def worker(sock): return exit_code -# Cleanup zombie children -def cleanup_dead_children(): - try: - while True: - pid, _ = os.waitpid(0, os.WNOHANG) - if not pid: - break - except: - pass - - def manager(): # Create a new process group to corral our children os.setpgid(0, 0) @@ -88,8 +78,12 @@ def manager(): listen_sock.bind(('127.0.0.1', 0)) listen_sock.listen(max(1024, SOMAXCONN)) listen_host, listen_port = listen_sock.getsockname() - write_int(listen_port, sys.stdout) - sys.stdout.flush() + + # re-open stdin/stdout in 'wb' mode + stdin_bin = os.fdopen(sys.stdin.fileno(), 'rb', 4) + stdout_bin = os.fdopen(sys.stdout.fileno(), 'wb', 4) + write_int(listen_port, stdout_bin) + stdout_bin.flush() def shutdown(code): signal.signal(SIGTERM, SIG_DFL) @@ -101,6 +95,7 @@ def manager(): shutdown(1) signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP + signal.signal(SIGCHLD, SIG_IGN) reuse = os.environ.get("SPARK_REUSE_WORKER") @@ -115,12 +110,9 @@ def manager(): else: raise - # cleanup in signal handler will cause deadlock - cleanup_dead_children() - if 0 in ready_fds: try: - worker_pid = read_int(sys.stdin) + worker_pid = read_int(stdin_bin) except EOFError: # Spark told us to exit by closing stdin shutdown(0) @@ -145,7 +137,7 @@ def manager(): time.sleep(1) pid = os.fork() # error here will shutdown daemon else: - outfile = sock.makefile('w') + outfile = sock.makefile(mode='wb') write_int(e.errno, outfile) # Signal that the fork failed outfile.flush() outfile.close() @@ -157,7 +149,7 @@ def manager(): listen_sock.close() try: # Acknowledge that the fork was successful - outfile = sock.makefile("w") + outfile = sock.makefile(mode="wb") write_int(os.getpid(), outfile) outfile.flush() outfile.close() diff --git a/python/pyspark/heapq3.py b/python/pyspark/heapq3.py index bc441f138f..4ef2afe035 100644 --- a/python/pyspark/heapq3.py +++ b/python/pyspark/heapq3.py @@ -627,51 +627,49 @@ def merge(iterables, key=None, reverse=False): if key is None: for order, it in enumerate(map(iter, iterables)): try: - next = it.next - h_append([next(), order * direction, next]) + h_append([next(it), order * direction, it]) except StopIteration: pass _heapify(h) while len(h) > 1: try: while True: - value, order, next = s = h[0] + value, order, it = s = h[0] yield value - s[0] = next() # raises StopIteration when exhausted + s[0] = next(it) # raises StopIteration when exhausted _heapreplace(h, s) # restore heap condition except StopIteration: _heappop(h) # remove empty iterator if h: # fast case when only a single iterator remains - value, order, next = h[0] + value, order, it = h[0] yield value - for value in next.__self__: + for value in it: yield value return for order, it in enumerate(map(iter, iterables)): try: - next = it.next - value = next() - h_append([key(value), order * direction, value, next]) + value = next(it) + h_append([key(value), order * direction, value, it]) except StopIteration: pass _heapify(h) while len(h) > 1: try: while True: - key_value, order, value, next = s = h[0] + key_value, order, value, it = s = h[0] yield value - value = next() + value = next(it) s[0] = key(value) s[2] = value _heapreplace(h, s) except StopIteration: _heappop(h) if h: - key_value, order, value, next = h[0] + key_value, order, value, it = h[0] yield value - for value in next.__self__: + for value in it: yield value diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 2a5e84a7df..45bc38f7e6 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -69,7 +69,7 @@ def launch_gateway(): if callback_socket in readable: gateway_connection = callback_socket.accept()[0] # Determine which ephemeral port the server started on: - gateway_port = read_int(gateway_connection.makefile()) + gateway_port = read_int(gateway_connection.makefile(mode="rb")) gateway_connection.close() callback_socket.close() if gateway_port is None: diff --git a/python/pyspark/join.py b/python/pyspark/join.py index c3491defb2..94df399016 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -32,6 +32,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ from pyspark.resultiterable import ResultIterable +from functools import reduce def _do_python_join(rdd, other, numPartitions, dispatch): diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index d7bc09fd77..45754bc9d4 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -39,10 +39,10 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti >>> lr = LogisticRegression(maxIter=5, regParam=0.01) >>> model = lr.fit(df) >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF() - >>> print model.transform(test0).head().prediction + >>> model.transform(test0).head().prediction 0.0 >>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF() - >>> print model.transform(test1).head().prediction + >>> model.transform(test1).head().prediction 1.0 >>> lr.setParams("vector") Traceback (most recent call last): diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 263fe2a5bc..4e4614b859 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -15,6 +15,7 @@ # limitations under the License. # +from pyspark.rdd import ignore_unicode_prefix from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasNumFeatures from pyspark.ml.util import keyword_only from pyspark.ml.wrapper import JavaTransformer @@ -24,6 +25,7 @@ __all__ = ['Tokenizer', 'HashingTF'] @inherit_doc +@ignore_unicode_prefix class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol): """ A tokenizer that converts the input string to lowercase and then @@ -32,15 +34,15 @@ class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol): >>> from pyspark.sql import Row >>> df = sc.parallelize([Row(text="a b c")]).toDF() >>> tokenizer = Tokenizer(inputCol="text", outputCol="words") - >>> print tokenizer.transform(df).head() + >>> tokenizer.transform(df).head() Row(text=u'a b c', words=[u'a', u'b', u'c']) >>> # Change a parameter. - >>> print tokenizer.setParams(outputCol="tokens").transform(df).head() + >>> tokenizer.setParams(outputCol="tokens").transform(df).head() Row(text=u'a b c', tokens=[u'a', u'b', u'c']) >>> # Temporarily modify a parameter. - >>> print tokenizer.transform(df, {tokenizer.outputCol: "words"}).head() + >>> tokenizer.transform(df, {tokenizer.outputCol: "words"}).head() Row(text=u'a b c', words=[u'a', u'b', u'c']) - >>> print tokenizer.transform(df).head() + >>> tokenizer.transform(df).head() Row(text=u'a b c', tokens=[u'a', u'b', u'c']) >>> # Must use keyword arguments to specify params. >>> tokenizer.setParams("text") @@ -79,13 +81,13 @@ class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures): >>> from pyspark.sql import Row >>> df = sc.parallelize([Row(words=["a", "b", "c"])]).toDF() >>> hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features") - >>> print hashingTF.transform(df).head().features - (10,[7,8,9],[1.0,1.0,1.0]) - >>> print hashingTF.setParams(outputCol="freqs").transform(df).head().freqs - (10,[7,8,9],[1.0,1.0,1.0]) + >>> hashingTF.transform(df).head().features + SparseVector(10, {7: 1.0, 8: 1.0, 9: 1.0}) + >>> hashingTF.setParams(outputCol="freqs").transform(df).head().freqs + SparseVector(10, {7: 1.0, 8: 1.0, 9: 1.0}) >>> params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"} - >>> print hashingTF.transform(df, params).head().vector - (5,[2,3,4],[1.0,1.0,1.0]) + >>> hashingTF.transform(df, params).head().vector + SparseVector(5, {2: 1.0, 3: 1.0, 4: 1.0}) """ _java_class = "org.apache.spark.ml.feature.HashingTF" diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py index 5c62620562..9fccb65675 100644 --- a/python/pyspark/ml/param/__init__.py +++ b/python/pyspark/ml/param/__init__.py @@ -63,8 +63,8 @@ class Params(Identifiable): uses :py:func:`dir` to get all attributes of type :py:class:`Param`. """ - return filter(lambda attr: isinstance(attr, Param), - [getattr(self, x) for x in dir(self) if x != "params"]) + return list(filter(lambda attr: isinstance(attr, Param), + [getattr(self, x) for x in dir(self) if x != "params"])) def _explain(self, param): """ @@ -185,7 +185,7 @@ class Params(Identifiable): """ Sets user-supplied params. """ - for param, value in kwargs.iteritems(): + for param, value in kwargs.items(): self.paramMap[getattr(self, param)] = value return self @@ -193,6 +193,6 @@ class Params(Identifiable): """ Sets default params. """ - for param, value in kwargs.iteritems(): + for param, value in kwargs.items(): self.defaultParamMap[getattr(self, param)] = value return self diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 55f4224976..6a3192465d 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import print_function + header = """# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -82,9 +84,9 @@ def _gen_param_code(name, doc, defaultValueStr): .replace("$defaultValueStr", str(defaultValueStr)) if __name__ == "__main__": - print header - print "\n# DO NOT MODIFY THIS FILE! It was generated by _shared_params_code_gen.py.\n" - print "from pyspark.ml.param import Param, Params\n\n" + print(header) + print("\n# DO NOT MODIFY THIS FILE! It was generated by _shared_params_code_gen.py.\n") + print("from pyspark.ml.param import Param, Params\n\n") shared = [ ("maxIter", "max number of iterations", None), ("regParam", "regularization constant", None), @@ -97,4 +99,4 @@ if __name__ == "__main__": code = [] for name, doc, defaultValueStr in shared: code.append(_gen_param_code(name, doc, defaultValueStr)) - print "\n\n\n".join(code) + print("\n\n\n".join(code)) diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py index f2ef573fe9..07507b2ad0 100644 --- a/python/pyspark/mllib/__init__.py +++ b/python/pyspark/mllib/__init__.py @@ -18,6 +18,7 @@ """ Python bindings for MLlib. """ +from __future__ import absolute_import # MLlib currently needs NumPy 1.4+, so complain if lower @@ -29,7 +30,9 @@ __all__ = ['classification', 'clustering', 'feature', 'fpm', 'linalg', 'random', 'recommendation', 'regression', 'stat', 'tree', 'util'] import sys -import rand as random -random.__name__ = 'random' -random.RandomRDDs.__module__ = __name__ + '.random' -sys.modules[__name__ + '.random'] = random +from . import rand as random +modname = __name__ + '.random' +random.__name__ = modname +random.RandomRDDs.__module__ = modname +sys.modules[modname] = random +del modname, sys diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 2466e8ac43..eda0b60f8b 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -510,9 +510,10 @@ class NaiveBayesModel(Saveable, Loader): def load(cls, sc, path): java_model = sc._jvm.org.apache.spark.mllib.classification.NaiveBayesModel.load( sc._jsc.sc(), path) - py_labels = _java2py(sc, java_model.labels()) - py_pi = _java2py(sc, java_model.pi()) - py_theta = _java2py(sc, java_model.theta()) + # Can not unpickle array.array from Pyrolite in Python3 with "bytes" + py_labels = _java2py(sc, java_model.labels(), "latin1") + py_pi = _java2py(sc, java_model.pi(), "latin1") + py_theta = _java2py(sc, java_model.theta(), "latin1") return NaiveBayesModel(py_labels, py_pi, numpy.array(py_theta)) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 464f49aeee..abbb7cf60e 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -15,6 +15,12 @@ # limitations under the License. # +import sys +import array as pyarray + +if sys.version > '3': + xrange = range + from numpy import array from pyspark import RDD @@ -55,8 +61,8 @@ class KMeansModel(Saveable, Loader): True >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3]) True - >>> type(model.clusterCenters) - + >>> isinstance(model.clusterCenters, list) + True >>> import os, tempfile >>> path = tempfile.mkdtemp() >>> model.save(sc, path) @@ -90,7 +96,7 @@ class KMeansModel(Saveable, Loader): return best def save(self, sc, path): - java_centers = _py2java(sc, map(_convert_to_vector, self.centers)) + java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers]) java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers) java_model.save(sc._jsc.sc(), path) @@ -133,7 +139,7 @@ class GaussianMixtureModel(object): ... 5.7048, 4.6567, 5.5026, ... 4.5605, 5.2043, 6.2734]).reshape(5, 3)) >>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001, - ... maxIterations=150, seed=10) + ... maxIterations=150, seed=10) >>> labels = model.predict(clusterdata_2).collect() >>> labels[0]==labels[1]==labels[2] True @@ -168,8 +174,8 @@ class GaussianMixtureModel(object): if isinstance(x, RDD): means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians]) membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector), - self.weights, means, sigmas) - return membership_matrix + _convert_to_vector(self.weights), means, sigmas) + return membership_matrix.map(lambda x: pyarray.array('d', x)) class GaussianMixture(object): diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py index a539d2f284..ba60589788 100644 --- a/python/pyspark/mllib/common.py +++ b/python/pyspark/mllib/common.py @@ -15,6 +15,11 @@ # limitations under the License. # +import sys +if sys.version >= '3': + long = int + unicode = str + import py4j.protocol from py4j.protocol import Py4JJavaError from py4j.java_gateway import JavaObject @@ -36,7 +41,7 @@ _float_str_mapping = { def _new_smart_decode(obj): if isinstance(obj, float): - s = unicode(obj) + s = str(obj) return _float_str_mapping.get(s, s) return _old_smart_decode(obj) @@ -74,15 +79,15 @@ def _py2java(sc, obj): obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client) elif isinstance(obj, JavaObject): pass - elif isinstance(obj, (int, long, float, bool, basestring)): + elif isinstance(obj, (int, long, float, bool, bytes, unicode)): pass else: - bytes = bytearray(PickleSerializer().dumps(obj)) - obj = sc._jvm.SerDe.loads(bytes) + data = bytearray(PickleSerializer().dumps(obj)) + obj = sc._jvm.SerDe.loads(data) return obj -def _java2py(sc, r): +def _java2py(sc, r, encoding="bytes"): if isinstance(r, JavaObject): clsName = r.getClass().getSimpleName() # convert RDD into JavaRDD @@ -102,8 +107,8 @@ def _java2py(sc, r): except Py4JJavaError: pass # not pickable - if isinstance(r, bytearray): - r = PickleSerializer().loads(str(r)) + if isinstance(r, (bytearray, bytes)): + r = PickleSerializer().loads(bytes(r), encoding=encoding) return r diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index 8be819acee..1140539a24 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -23,12 +23,17 @@ from __future__ import absolute_import import sys import warnings import random +import binascii +if sys.version >= '3': + basestring = str + unicode = str from py4j.protocol import Py4JJavaError -from pyspark import RDD, SparkContext +from pyspark import SparkContext +from pyspark.rdd import RDD, ignore_unicode_prefix from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper -from pyspark.mllib.linalg import Vectors, Vector, _convert_to_vector +from pyspark.mllib.linalg import Vectors, _convert_to_vector __all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', 'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel'] @@ -206,7 +211,7 @@ class HashingTF(object): >>> htf = HashingTF(100) >>> doc = "a a b b c d".split(" ") >>> htf.transform(doc) - SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0}) + SparseVector(100, {...}) """ def __init__(self, numFeatures=1 << 20): """ @@ -360,6 +365,7 @@ class Word2VecModel(JavaVectorTransformer): return self.call("getVectors") +@ignore_unicode_prefix class Word2Vec(object): """ Word2Vec creates vector representation of words in a text corpus. @@ -382,7 +388,7 @@ class Word2Vec(object): >>> sentence = "a b " * 100 + "a c " * 10 >>> localDoc = [sentence, sentence] >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) - >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc) + >>> model = Word2Vec().setVectorSize(10).setSeed(42).fit(doc) >>> syms = model.findSynonyms("a", 2) >>> [s[0] for s in syms] @@ -400,7 +406,7 @@ class Word2Vec(object): self.learningRate = 0.025 self.numPartitions = 1 self.numIterations = 1 - self.seed = random.randint(0, sys.maxint) + self.seed = random.randint(0, sys.maxsize) self.minCount = 5 def setVectorSize(self, vectorSize): @@ -459,7 +465,7 @@ class Word2Vec(object): raise TypeError("data should be an RDD of list of string") jmodel = callMLlibFunc("trainWord2Vec", data, int(self.vectorSize), float(self.learningRate), int(self.numPartitions), - int(self.numIterations), long(self.seed), + int(self.numIterations), int(self.seed), int(self.minCount)) return Word2VecModel(jmodel) diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index 3aa6d79d70..628ccc01cf 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -16,12 +16,14 @@ # from pyspark import SparkContext +from pyspark.rdd import ignore_unicode_prefix from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc __all__ = ['FPGrowth', 'FPGrowthModel'] @inherit_doc +@ignore_unicode_prefix class FPGrowthModel(JavaModelWrapper): """ diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index a80320c52d..38b3aa3ad4 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -25,7 +25,13 @@ SciPy is available in their environment. import sys import array -import copy_reg + +if sys.version >= '3': + basestring = str + xrange = range + import copyreg as copy_reg +else: + import copy_reg import numpy as np @@ -57,7 +63,7 @@ except: def _convert_to_vector(l): if isinstance(l, Vector): return l - elif type(l) in (array.array, np.array, np.ndarray, list, tuple): + elif type(l) in (array.array, np.array, np.ndarray, list, tuple, xrange): return DenseVector(l) elif _have_scipy and scipy.sparse.issparse(l): assert l.shape[1] == 1, "Expected column vector" @@ -88,7 +94,7 @@ def _vector_size(v): """ if isinstance(v, Vector): return len(v) - elif type(v) in (array.array, list, tuple): + elif type(v) in (array.array, list, tuple, xrange): return len(v) elif type(v) == np.ndarray: if v.ndim == 1 or (v.ndim == 2 and v.shape[1] == 1): @@ -193,7 +199,7 @@ class DenseVector(Vector): DenseVector([1.0, 0.0]) """ def __init__(self, ar): - if isinstance(ar, basestring): + if isinstance(ar, bytes): ar = np.frombuffer(ar, dtype=np.float64) elif not isinstance(ar, np.ndarray): ar = np.array(ar, dtype=np.float64) @@ -321,11 +327,13 @@ class DenseVector(Vector): __sub__ = _delegate("__sub__") __mul__ = _delegate("__mul__") __div__ = _delegate("__div__") + __truediv__ = _delegate("__truediv__") __mod__ = _delegate("__mod__") __radd__ = _delegate("__radd__") __rsub__ = _delegate("__rsub__") __rmul__ = _delegate("__rmul__") __rdiv__ = _delegate("__rdiv__") + __rtruediv__ = _delegate("__rtruediv__") __rmod__ = _delegate("__rmod__") @@ -344,12 +352,12 @@ class SparseVector(Vector): :param args: Non-zero entries, as a dictionary, list of tupes, or two sorted lists containing indices and values. - >>> print SparseVector(4, {1: 1.0, 3: 5.5}) - (4,[1,3],[1.0,5.5]) - >>> print SparseVector(4, [(1, 1.0), (3, 5.5)]) - (4,[1,3],[1.0,5.5]) - >>> print SparseVector(4, [1, 3], [1.0, 5.5]) - (4,[1,3],[1.0,5.5]) + >>> SparseVector(4, {1: 1.0, 3: 5.5}) + SparseVector(4, {1: 1.0, 3: 5.5}) + >>> SparseVector(4, [(1, 1.0), (3, 5.5)]) + SparseVector(4, {1: 1.0, 3: 5.5}) + >>> SparseVector(4, [1, 3], [1.0, 5.5]) + SparseVector(4, {1: 1.0, 3: 5.5}) """ self.size = int(size) assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments" @@ -361,8 +369,8 @@ class SparseVector(Vector): self.indices = np.array([p[0] for p in pairs], dtype=np.int32) self.values = np.array([p[1] for p in pairs], dtype=np.float64) else: - if isinstance(args[0], basestring): - assert isinstance(args[1], str), "values should be string too" + if isinstance(args[0], bytes): + assert isinstance(args[1], bytes), "values should be string too" if args[0]: self.indices = np.frombuffer(args[0], np.int32) self.values = np.frombuffer(args[1], np.float64) @@ -591,12 +599,12 @@ class Vectors(object): :param args: Non-zero entries, as a dictionary, list of tupes, or two sorted lists containing indices and values. - >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5}) - (4,[1,3],[1.0,5.5]) - >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)]) - (4,[1,3],[1.0,5.5]) - >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5]) - (4,[1,3],[1.0,5.5]) + >>> Vectors.sparse(4, {1: 1.0, 3: 5.5}) + SparseVector(4, {1: 1.0, 3: 5.5}) + >>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)]) + SparseVector(4, {1: 1.0, 3: 5.5}) + >>> Vectors.sparse(4, [1, 3], [1.0, 5.5]) + SparseVector(4, {1: 1.0, 3: 5.5}) """ return SparseVector(size, *args) @@ -645,7 +653,7 @@ class Matrix(object): """ Convert Matrix attributes which are array-like or buffer to array. """ - if isinstance(array_like, basestring): + if isinstance(array_like, bytes): return np.frombuffer(array_like, dtype=dtype) return np.asarray(array_like, dtype=dtype) @@ -677,7 +685,7 @@ class DenseMatrix(Matrix): def toSparse(self): """Convert to SparseMatrix""" indices = np.nonzero(self.values)[0] - colCounts = np.bincount(indices / self.numRows) + colCounts = np.bincount(indices // self.numRows) colPtrs = np.cumsum(np.hstack( (0, colCounts, np.zeros(self.numCols - colCounts.size)))) values = self.values[indices] diff --git a/python/pyspark/mllib/rand.py b/python/pyspark/mllib/rand.py index 20ee9d78bf..06fbc0eb6a 100644 --- a/python/pyspark/mllib/rand.py +++ b/python/pyspark/mllib/rand.py @@ -88,10 +88,10 @@ class RandomRDDs(object): :param seed: Random seed (default: a random long integer). :return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0). - >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L) + >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1) >>> stats = x.stats() >>> stats.count() - 1000L + 1000 >>> abs(stats.mean() - 0.0) < 0.1 True >>> abs(stats.stdev() - 1.0) < 0.1 @@ -118,10 +118,10 @@ class RandomRDDs(object): >>> std = 1.0 >>> expMean = exp(mean + 0.5 * std * std) >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std)) - >>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2L) + >>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2) >>> stats = x.stats() >>> stats.count() - 1000L + 1000 >>> abs(stats.mean() - expMean) < 0.5 True >>> from math import sqrt @@ -145,10 +145,10 @@ class RandomRDDs(object): :return: RDD of float comprised of i.i.d. samples ~ Pois(mean). >>> mean = 100.0 - >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L) + >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2) >>> stats = x.stats() >>> stats.count() - 1000L + 1000 >>> abs(stats.mean() - mean) < 0.5 True >>> from math import sqrt @@ -171,10 +171,10 @@ class RandomRDDs(object): :return: RDD of float comprised of i.i.d. samples ~ Exp(mean). >>> mean = 2.0 - >>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2L) + >>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2) >>> stats = x.stats() >>> stats.count() - 1000L + 1000 >>> abs(stats.mean() - mean) < 0.5 True >>> from math import sqrt @@ -202,10 +202,10 @@ class RandomRDDs(object): >>> scale = 2.0 >>> expMean = shape * scale >>> expStd = sqrt(shape * scale * scale) - >>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2L) + >>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2) >>> stats = x.stats() >>> stats.count() - 1000L + 1000 >>> abs(stats.mean() - expMean) < 0.5 True >>> abs(stats.stdev() - expStd) < 0.5 @@ -254,7 +254,7 @@ class RandomRDDs(object): :return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`. >>> import numpy as np - >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect()) + >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1).collect()) >>> mat.shape (100, 100) >>> abs(mat.mean() - 0.0) < 0.1 @@ -286,8 +286,8 @@ class RandomRDDs(object): >>> std = 1.0 >>> expMean = exp(mean + 0.5 * std * std) >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std)) - >>> mat = np.matrix(RandomRDDs.logNormalVectorRDD(sc, mean, std, \ - 100, 100, seed=1L).collect()) + >>> m = RandomRDDs.logNormalVectorRDD(sc, mean, std, 100, 100, seed=1).collect() + >>> mat = np.matrix(m) >>> mat.shape (100, 100) >>> abs(mat.mean() - expMean) < 0.1 @@ -315,7 +315,7 @@ class RandomRDDs(object): >>> import numpy as np >>> mean = 100.0 - >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L) + >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1) >>> mat = np.mat(rdd.collect()) >>> mat.shape (100, 100) @@ -345,7 +345,7 @@ class RandomRDDs(object): >>> import numpy as np >>> mean = 0.5 - >>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1L) + >>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1) >>> mat = np.mat(rdd.collect()) >>> mat.shape (100, 100) @@ -380,8 +380,7 @@ class RandomRDDs(object): >>> scale = 2.0 >>> expMean = shape * scale >>> expStd = sqrt(shape * scale * scale) - >>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, \ - 100, 100, seed=1L).collect()) + >>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, 100, 100, seed=1).collect()) >>> mat.shape (100, 100) >>> abs(mat.mean() - expMean) < 0.1 diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index c5c4c13dae..80e0a356bb 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -15,6 +15,7 @@ # limitations under the License. # +import array from collections import namedtuple from pyspark import SparkContext @@ -104,14 +105,14 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader): assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)" first = user_product.first() assert len(first) == 2, "user_product should be RDD of (user, product)" - user_product = user_product.map(lambda (u, p): (int(u), int(p))) + user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1]))) return self.call("predict", user_product) def userFeatures(self): - return self.call("getUserFeatures") + return self.call("getUserFeatures").mapValues(lambda v: array.array('d', v)) def productFeatures(self): - return self.call("getProductFeatures") + return self.call("getProductFeatures").mapValues(lambda v: array.array('d', v)) @classmethod def load(cls, sc, path): diff --git a/python/pyspark/mllib/stat/_statistics.py b/python/pyspark/mllib/stat/_statistics.py index 1d83e9d483..b475be4b4d 100644 --- a/python/pyspark/mllib/stat/_statistics.py +++ b/python/pyspark/mllib/stat/_statistics.py @@ -15,7 +15,7 @@ # limitations under the License. # -from pyspark import RDD +from pyspark.rdd import RDD, ignore_unicode_prefix from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper from pyspark.mllib.linalg import Matrix, _convert_to_vector from pyspark.mllib.regression import LabeledPoint @@ -38,7 +38,7 @@ class MultivariateStatisticalSummary(JavaModelWrapper): return self.call("variance").toArray() def count(self): - return self.call("count") + return int(self.call("count")) def numNonzeros(self): return self.call("numNonzeros").toArray() @@ -78,7 +78,7 @@ class Statistics(object): >>> cStats.variance() array([ 4., 13., 0., 25.]) >>> cStats.count() - 3L + 3 >>> cStats.numNonzeros() array([ 3., 2., 0., 3.]) >>> cStats.max() @@ -124,20 +124,20 @@ class Statistics(object): >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]), ... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])]) >>> pearsonCorr = Statistics.corr(rdd) - >>> print str(pearsonCorr).replace('nan', 'NaN') + >>> print(str(pearsonCorr).replace('nan', 'NaN')) [[ 1. 0.05564149 NaN 0.40047142] [ 0.05564149 1. NaN 0.91359586] [ NaN NaN 1. NaN] [ 0.40047142 0.91359586 NaN 1. ]] >>> spearmanCorr = Statistics.corr(rdd, method="spearman") - >>> print str(spearmanCorr).replace('nan', 'NaN') + >>> print(str(spearmanCorr).replace('nan', 'NaN')) [[ 1. 0.10540926 NaN 0.4 ] [ 0.10540926 1. NaN 0.9486833 ] [ NaN NaN 1. NaN] [ 0.4 0.9486833 NaN 1. ]] >>> try: ... Statistics.corr(rdd, "spearman") - ... print "Method name as second argument without 'method=' shouldn't be allowed." + ... print("Method name as second argument without 'method=' shouldn't be allowed.") ... except TypeError: ... pass """ @@ -153,6 +153,7 @@ class Statistics(object): return callMLlibFunc("corr", x.map(float), y.map(float), method) @staticmethod + @ignore_unicode_prefix def chiSqTest(observed, expected=None): """ .. note:: Experimental @@ -188,11 +189,11 @@ class Statistics(object): >>> from pyspark.mllib.linalg import Vectors, Matrices >>> observed = Vectors.dense([4, 6, 5]) >>> pearson = Statistics.chiSqTest(observed) - >>> print pearson.statistic + >>> print(pearson.statistic) 0.4 >>> pearson.degreesOfFreedom 2 - >>> print round(pearson.pValue, 4) + >>> print(round(pearson.pValue, 4)) 0.8187 >>> pearson.method u'pearson' @@ -202,12 +203,12 @@ class Statistics(object): >>> observed = Vectors.dense([21, 38, 43, 80]) >>> expected = Vectors.dense([3, 5, 7, 20]) >>> pearson = Statistics.chiSqTest(observed, expected) - >>> print round(pearson.pValue, 4) + >>> print(round(pearson.pValue, 4)) 0.0027 >>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0] >>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data)) - >>> print round(chi.statistic, 4) + >>> print(round(chi.statistic, 4)) 21.9958 >>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])), @@ -218,9 +219,9 @@ class Statistics(object): ... LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),] >>> rdd = sc.parallelize(data, 4) >>> chi = Statistics.chiSqTest(rdd) - >>> print chi[0].statistic + >>> print(chi[0].statistic) 0.75 - >>> print chi[1].statistic + >>> print(chi[1].statistic) 1.5 """ if isinstance(observed, RDD): diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 8eaddcf8b9..c6ed5acd17 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -72,11 +72,11 @@ class VectorTests(PySparkTestCase): def _test_serialize(self, v): self.assertEqual(v, ser.loads(ser.dumps(v))) jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v))) - nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec))) + nv = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvec))) self.assertEqual(v, nv) vs = [v] * 100 jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs))) - nvs = ser.loads(str(self.sc._jvm.SerDe.dumps(jvecs))) + nvs = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvecs))) self.assertEqual(vs, nvs) def test_serialize(self): @@ -412,11 +412,11 @@ class StatTests(PySparkTestCase): self.assertEqual(10, len(summary.normL1())) self.assertEqual(10, len(summary.normL2())) - data2 = self.sc.parallelize(xrange(10)).map(lambda x: Vectors.dense(x)) + data2 = self.sc.parallelize(range(10)).map(lambda x: Vectors.dense(x)) summary2 = Statistics.colStats(data2) self.assertEqual(array([45.0]), summary2.normL1()) import math - expectedNormL2 = math.sqrt(sum(map(lambda x: x*x, xrange(10)))) + expectedNormL2 = math.sqrt(sum(map(lambda x: x*x, range(10)))) self.assertTrue(math.fabs(summary2.normL2()[0] - expectedNormL2) < 1e-14) @@ -438,11 +438,11 @@ class VectorUDTTests(PySparkTestCase): def test_infer_schema(self): sqlCtx = SQLContext(self.sc) rdd = self.sc.parallelize([LabeledPoint(1.0, self.dv1), LabeledPoint(0.0, self.sv1)]) - srdd = sqlCtx.inferSchema(rdd) - schema = srdd.schema + df = rdd.toDF() + schema = df.schema field = [f for f in schema.fields if f.name == "features"][0] self.assertEqual(field.dataType, self.udt) - vectors = srdd.map(lambda p: p.features).collect() + vectors = df.map(lambda p: p.features).collect() self.assertEqual(len(vectors), 2) for v in vectors: if isinstance(v, SparseVector): @@ -695,7 +695,7 @@ class ChiSqTestTests(PySparkTestCase): class SerDeTest(PySparkTestCase): def test_to_java_object_rdd(self): # SPARK-6660 - data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0L) + data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0) self.assertEqual(_to_java_object_rdd(data).count(), 10) @@ -771,7 +771,7 @@ class StandardScalerTests(PySparkTestCase): if __name__ == "__main__": if not _have_scipy: - print "NOTE: Skipping SciPy tests as it does not seem to be installed" + print("NOTE: Skipping SciPy tests as it does not seem to be installed") unittest.main() if not _have_scipy: - print "NOTE: SciPy tests were skipped as it does not seem to be installed" + print("NOTE: SciPy tests were skipped as it does not seem to be installed") diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py index a7a4d2aaf8..0fe6e4fabe 100644 --- a/python/pyspark/mllib/tree.py +++ b/python/pyspark/mllib/tree.py @@ -163,14 +163,16 @@ class DecisionTree(object): ... LabeledPoint(1.0, [3.0]) ... ] >>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {}) - >>> print model, # it already has newline + >>> print(model) DecisionTreeModel classifier of depth 1 with 3 nodes - >>> print model.toDebugString(), # it already has newline + + >>> print(model.toDebugString()) DecisionTreeModel classifier of depth 1 with 3 nodes If (feature 0 <= 0.0) Predict: 0.0 Else (feature 0 > 0.0) Predict: 1.0 + >>> model.predict(array([1.0])) 1.0 >>> model.predict(array([0.0])) @@ -318,9 +320,10 @@ class RandomForest(object): 3 >>> model.totalNumNodes() 7 - >>> print model, + >>> print(model) TreeEnsembleModel classifier with 3 trees - >>> print model.toDebugString(), + + >>> print(model.toDebugString()) TreeEnsembleModel classifier with 3 trees Tree 0: @@ -335,6 +338,7 @@ class RandomForest(object): Predict: 0.0 Else (feature 0 > 1.0) Predict: 1.0 + >>> model.predict([2.0]) 1.0 >>> model.predict([0.0]) @@ -483,8 +487,9 @@ class GradientBoostedTrees(object): 100 >>> model.totalNumNodes() 300 - >>> print model, # it already has newline + >>> print(model) # it already has newline TreeEnsembleModel classifier with 100 trees + >>> model.predict([2.0]) 1.0 >>> model.predict([0.0]) diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index c5c3468eb9..16a90db146 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -15,10 +15,14 @@ # limitations under the License. # +import sys import numpy as np import warnings -from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper, inherit_doc +if sys.version > '3': + xrange = range + +from pyspark.mllib.common import callMLlibFunc, inherit_doc from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector @@ -94,22 +98,16 @@ class MLUtils(object): >>> from pyspark.mllib.util import MLUtils >>> from pyspark.mllib.regression import LabeledPoint >>> tempFile = NamedTemporaryFile(delete=True) - >>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0") + >>> _ = tempFile.write(b"+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0") >>> tempFile.flush() >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect() >>> tempFile.close() - >>> type(examples[0]) == LabeledPoint - True - >>> print examples[0] - (1.0,(6,[0,2,4],[1.0,2.0,3.0])) - >>> type(examples[1]) == LabeledPoint - True - >>> print examples[1] - (-1.0,(6,[],[])) - >>> type(examples[2]) == LabeledPoint - True - >>> print examples[2] - (-1.0,(6,[1,3,5],[4.0,5.0,6.0])) + >>> examples[0] + LabeledPoint(1.0, (6,[0,2,4],[1.0,2.0,3.0])) + >>> examples[1] + LabeledPoint(-1.0, (6,[],[])) + >>> examples[2] + LabeledPoint(-1.0, (6,[1,3,5],[4.0,5.0,6.0])) """ from pyspark.mllib.regression import LabeledPoint if multiclass is not None: diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index 4408996db0..d18daaabfc 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -84,11 +84,11 @@ class Profiler(object): >>> from pyspark import BasicProfiler >>> class MyCustomProfiler(BasicProfiler): ... def show(self, id): - ... print "My custom profiles for RDD:%s" % id + ... print("My custom profiles for RDD:%s" % id) ... >>> conf = SparkConf().set("spark.python.profile", "true") >>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler) - >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10) + >>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] >>> sc.show_profiles() My custom profiles for RDD:1 @@ -111,9 +111,9 @@ class Profiler(object): """ Print the profile stats to stdout, id is the RDD id """ stats = self.stats() if stats: - print "=" * 60 - print "Profile of RDD" % id - print "=" * 60 + print("=" * 60) + print("Profile of RDD" % id) + print("=" * 60) stats.sort_stats("time", "cumulative").print_stats() def dump(self, id, path): diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 93e658eded..d9cdbb666f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -16,21 +16,29 @@ # import copy -from collections import defaultdict -from itertools import chain, ifilter, imap -import operator import sys +import os +import re +import operator import shlex -from subprocess import Popen, PIPE -from tempfile import NamedTemporaryFile -from threading import Thread import warnings import heapq import bisect import random import socket +from subprocess import Popen, PIPE +from tempfile import NamedTemporaryFile +from threading import Thread +from collections import defaultdict +from itertools import chain +from functools import reduce from math import sqrt, log, isinf, isnan, pow, ceil +if sys.version > '3': + basestring = unicode = str +else: + from itertools import imap as map, ifilter as filter + from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer @@ -50,20 +58,21 @@ from py4j.java_collections import ListConverter, MapConverter __all__ = ["RDD"] -# TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized -# hash for string def portable_hash(x): """ - This function returns consistant hash code for builtin types, especially + This function returns consistent hash code for builtin types, especially for None and tuple with None. - The algrithm is similar to that one used by CPython 2.7 + The algorithm is similar to that one used by CPython 2.7 >>> portable_hash(None) 0 >>> portable_hash((None, 1)) & 0xffffffff 219750521 """ + if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ: + raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED") + if x is None: return 0 if isinstance(x, tuple): @@ -71,7 +80,7 @@ def portable_hash(x): for i in x: h ^= portable_hash(i) h *= 1000003 - h &= sys.maxint + h &= sys.maxsize h ^= len(x) if h == -1: h = -2 @@ -123,6 +132,19 @@ def _load_from_socket(port, serializer): sock.close() +def ignore_unicode_prefix(f): + """ + Ignore the 'u' prefix of string in doc tests, to make it works + in both python 2 and 3 + """ + if sys.version >= '3': + # the representation of unicode string in Python 3 does not have prefix 'u', + # so remove the prefix 'u' for doc tests + literal_re = re.compile(r"(\W|^)[uU](['])", re.UNICODE) + f.__doc__ = literal_re.sub(r'\1\2', f.__doc__) + return f + + class Partitioner(object): def __init__(self, numPartitions, partitionFunc): self.numPartitions = numPartitions @@ -251,7 +273,7 @@ class RDD(object): [('a', 1), ('b', 1), ('c', 1)] """ def func(_, iterator): - return imap(f, iterator) + return map(f, iterator) return self.mapPartitionsWithIndex(func, preservesPartitioning) def flatMap(self, f, preservesPartitioning=False): @@ -266,7 +288,7 @@ class RDD(object): [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] """ def func(s, iterator): - return chain.from_iterable(imap(f, iterator)) + return chain.from_iterable(map(f, iterator)) return self.mapPartitionsWithIndex(func, preservesPartitioning) def mapPartitions(self, f, preservesPartitioning=False): @@ -329,7 +351,7 @@ class RDD(object): [2, 4] """ def func(iterator): - return ifilter(f, iterator) + return filter(f, iterator) return self.mapPartitions(func, True) def distinct(self, numPartitions=None): @@ -341,7 +363,7 @@ class RDD(object): """ return self.map(lambda x: (x, None)) \ .reduceByKey(lambda x, _: x, numPartitions) \ - .map(lambda (x, _): x) + .map(lambda x: x[0]) def sample(self, withReplacement, fraction, seed=None): """ @@ -354,8 +376,8 @@ class RDD(object): :param seed: seed for the random number generator >>> rdd = sc.parallelize(range(100), 4) - >>> rdd.sample(False, 0.1, 81).count() - 10 + >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14 + True """ assert fraction >= 0.0, "Negative fraction value: %s" % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) @@ -368,12 +390,14 @@ class RDD(object): :param seed: random seed :return: split RDDs in a list - >>> rdd = sc.parallelize(range(5), 1) + >>> rdd = sc.parallelize(range(500), 1) >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17) - >>> rdd1.collect() - [1, 3] - >>> rdd2.collect() - [0, 2, 4] + >>> len(rdd1.collect() + rdd2.collect()) + 500 + >>> 150 < rdd1.count() < 250 + True + >>> 250 < rdd2.count() < 350 + True """ s = float(sum(weights)) cweights = [0.0] @@ -416,7 +440,7 @@ class RDD(object): rand.shuffle(samples) return samples - maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint)) + maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize)) if num > maxSampleSize: raise ValueError( "Sample size cannot be greater than %d." % maxSampleSize) @@ -430,7 +454,7 @@ class RDD(object): # See: scala/spark/RDD.scala while len(samples) < num: # TODO: add log warning for when more than one iteration was run - seed = rand.randint(0, sys.maxint) + seed = rand.randint(0, sys.maxsize) samples = self.sample(withReplacement, fraction, seed).collect() rand.shuffle(samples) @@ -507,7 +531,7 @@ class RDD(object): """ return self.map(lambda v: (v, None)) \ .cogroup(other.map(lambda v: (v, None))) \ - .filter(lambda (k, vs): all(vs)) \ + .filter(lambda k_vs: all(k_vs[1])) \ .keys() def _reserialize(self, serializer=None): @@ -549,7 +573,7 @@ class RDD(object): def sortPartition(iterator): sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted - return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending))) + return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending))) return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True) @@ -579,7 +603,7 @@ class RDD(object): def sortPartition(iterator): sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted - return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending))) + return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending))) if numPartitions == 1: if self.getNumPartitions() > 1: @@ -594,12 +618,12 @@ class RDD(object): return self # empty RDD maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner fraction = min(maxSampleSize / max(rddSize, 1), 1.0) - samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() + samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect() samples = sorted(samples, key=keyfunc) # we have numPartitions many parts but one of the them has # an implicit boundary - bounds = [samples[len(samples) * (i + 1) / numPartitions] + bounds = [samples[int(len(samples) * (i + 1) / numPartitions)] for i in range(0, numPartitions - 1)] def rangePartitioner(k): @@ -662,12 +686,13 @@ class RDD(object): """ return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) + @ignore_unicode_prefix def pipe(self, command, env={}): """ Return an RDD created by piping elements to a forked external process. >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() - ['1', '2', '', '3'] + [u'1', u'2', u'', u'3'] """ def func(iterator): pipe = Popen( @@ -675,17 +700,18 @@ class RDD(object): def pipe_objs(out): for obj in iterator: - out.write(str(obj).rstrip('\n') + '\n') + s = str(obj).rstrip('\n') + '\n' + out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() - return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) + return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) return self.mapPartitions(func) def foreach(self, f): """ Applies a function to all elements of this RDD. - >>> def f(x): print x + >>> def f(x): print(x) >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) """ def processPartition(iterator): @@ -700,7 +726,7 @@ class RDD(object): >>> def f(iterator): ... for x in iterator: - ... print x + ... print(x) >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) """ def func(it): @@ -874,7 +900,7 @@ class RDD(object): # aggregation. while numPartitions > scale + numPartitions / scale: numPartitions /= scale - curNumPartitions = numPartitions + curNumPartitions = int(numPartitions) def mapPartition(i, iterator): for obj in iterator: @@ -984,7 +1010,7 @@ class RDD(object): (('a', 'b', 'c'), [2, 2]) """ - if isinstance(buckets, (int, long)): + if isinstance(buckets, int): if buckets < 1: raise ValueError("number of buckets must be >= 1") @@ -1020,6 +1046,7 @@ class RDD(object): raise ValueError("Can not generate buckets with infinite value") # keep them as integer if possible + inc = int(inc) if inc * buckets != maxv - minv: inc = (maxv - minv) * 1.0 / buckets @@ -1137,7 +1164,7 @@ class RDD(object): yield counts def mergeMaps(m1, m2): - for k, v in m2.iteritems(): + for k, v in m2.items(): m1[k] += v return m1 return self.mapPartitions(countPartition).reduce(mergeMaps) @@ -1378,8 +1405,8 @@ class RDD(object): >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) - >>> sorted(sc.pickleFile(tmpFile.name, 5).collect()) - [1, 2, 'rdd', 'spark'] + >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect()) + ['1', '2', 'rdd', 'spark'] """ if batchSize == 0: ser = AutoBatchedSerializer(PickleSerializer()) @@ -1387,6 +1414,7 @@ class RDD(object): ser = BatchedSerializer(PickleSerializer(), batchSize) self._reserialize(ser)._jrdd.saveAsObjectFile(path) + @ignore_unicode_prefix def saveAsTextFile(self, path, compressionCodecClass=None): """ Save this RDD as a text file, using string representations of elements. @@ -1418,12 +1446,13 @@ class RDD(object): >>> codec = "org.apache.hadoop.io.compress.GzipCodec" >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec) >>> from fileinput import input, hook_compressed - >>> ''.join(sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))) - 'bar\\nfoo\\n' + >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)) + >>> b''.join(result).decode('utf-8') + u'bar\\nfoo\\n' """ def func(split, iterator): for x in iterator: - if not isinstance(x, basestring): + if not isinstance(x, (unicode, bytes)): x = unicode(x) if isinstance(x, unicode): x = x.encode("utf-8") @@ -1458,7 +1487,7 @@ class RDD(object): >>> m.collect() [1, 3] """ - return self.map(lambda (k, v): k) + return self.map(lambda x: x[0]) def values(self): """ @@ -1468,7 +1497,7 @@ class RDD(object): >>> m.collect() [2, 4] """ - return self.map(lambda (k, v): v) + return self.map(lambda x: x[1]) def reduceByKey(self, func, numPartitions=None): """ @@ -1507,7 +1536,7 @@ class RDD(object): yield m def mergeMaps(m1, m2): - for k, v in m2.iteritems(): + for k, v in m2.items(): m1[k] = func(m1[k], v) if k in m1 else v return m1 return self.mapPartitions(reducePartition).reduce(mergeMaps) @@ -1604,8 +1633,8 @@ class RDD(object): >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) >>> sets = pairs.partitionBy(2).glom().collect() - >>> set(sets[0]).intersection(set(sets[1])) - set([]) + >>> len(set(sets[0]).intersection(set(sets[1]))) + 0 """ if numPartitions is None: numPartitions = self._defaultReducePartitions() @@ -1637,22 +1666,22 @@ class RDD(object): if (c % 1000 == 0 and get_used_memory() > limit or c > batch): n, size = len(buckets), 0 - for split in buckets.keys(): + for split in list(buckets.keys()): yield pack_long(split) d = outputSerializer.dumps(buckets[split]) del buckets[split] yield d size += len(d) - avg = (size / n) >> 20 + avg = int(size / n) >> 20 # let 1M < avg < 10M if avg < 1: batch *= 1.5 elif avg > 10: - batch = max(batch / 1.5, 1) + batch = max(int(batch / 1.5), 1) c = 0 - for split, items in buckets.iteritems(): + for split, items in buckets.items(): yield pack_long(split) yield outputSerializer.dumps(items) @@ -1707,7 +1736,7 @@ class RDD(object): merger = ExternalMerger(agg, memory * 0.9, serializer) \ if spill else InMemoryMerger(agg) merger.mergeValues(iterator) - return merger.iteritems() + return merger.items() locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True) shuffled = locally_combined.partitionBy(numPartitions) @@ -1716,7 +1745,7 @@ class RDD(object): merger = ExternalMerger(agg, memory, serializer) \ if spill else InMemoryMerger(agg) merger.mergeCombiners(iterator) - return merger.iteritems() + return merger.items() return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True) @@ -1745,7 +1774,7 @@ class RDD(object): >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> from operator import add - >>> rdd.foldByKey(0, add).collect() + >>> sorted(rdd.foldByKey(0, add).collect()) [('a', 2), ('b', 1)] """ def createZero(): @@ -1769,10 +1798,10 @@ class RDD(object): sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance. - >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) - >>> sorted(x.groupByKey().mapValues(len).collect()) + >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) + >>> sorted(rdd.groupByKey().mapValues(len).collect()) [('a', 2), ('b', 1)] - >>> sorted(x.groupByKey().mapValues(list).collect()) + >>> sorted(rdd.groupByKey().mapValues(list).collect()) [('a', [1, 1]), ('b', [1])] """ def createCombiner(x): @@ -1795,7 +1824,7 @@ class RDD(object): merger = ExternalMerger(agg, memory * 0.9, serializer) \ if spill else InMemoryMerger(agg) merger.mergeValues(iterator) - return merger.iteritems() + return merger.items() locally_combined = self.mapPartitions(combine, preservesPartitioning=True) shuffled = locally_combined.partitionBy(numPartitions) @@ -1804,7 +1833,7 @@ class RDD(object): merger = ExternalGroupBy(agg, memory, serializer)\ if spill else InMemoryMerger(agg) merger.mergeCombiners(it) - return merger.iteritems() + return merger.items() return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable) @@ -1819,7 +1848,7 @@ class RDD(object): >>> x.flatMapValues(f).collect() [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] """ - flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) + flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1])) return self.flatMap(flat_map_fn, preservesPartitioning=True) def mapValues(self, f): @@ -1833,7 +1862,7 @@ class RDD(object): >>> x.mapValues(f).collect() [('a', 3), ('b', 1)] """ - map_values_fn = lambda (k, v): (k, f(v)) + map_values_fn = lambda kv: (kv[0], f(kv[1])) return self.map(map_values_fn, preservesPartitioning=True) def groupWith(self, other, *others): @@ -1844,8 +1873,7 @@ class RDD(object): >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) >>> z = sc.parallelize([("b", 42)]) - >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \ - sorted(list(w.groupWith(x, y, z).collect()))) + >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))] [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))] """ @@ -1860,7 +1888,7 @@ class RDD(object): >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) - >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) + >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))] [('a', ([1], [2])), ('b', ([4], []))] """ return python_cogroup((self, other), numPartitions) @@ -1896,8 +1924,9 @@ class RDD(object): >>> sorted(x.subtractByKey(y).collect()) [('b', 4), ('b', 5)] """ - def filter_func((key, vals)): - return vals[0] and not vals[1] + def filter_func(pair): + key, (val1, val2) = pair + return val1 and not val2 return self.cogroup(other, numPartitions).filter(filter_func).flatMapValues(lambda x: x[0]) def subtract(self, other, numPartitions=None): @@ -1919,8 +1948,8 @@ class RDD(object): >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) >>> y = sc.parallelize(zip(range(0,5), range(0,5))) - >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) - [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] + >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())] + [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])] """ return self.map(lambda x: (f(x), x)) @@ -2049,17 +2078,18 @@ class RDD(object): """ Return the name of this RDD. """ - name_ = self._jrdd.name() - if name_: - return name_.encode('utf-8') + n = self._jrdd.name() + if n: + return n + @ignore_unicode_prefix def setName(self, name): """ Assign a name to this RDD. - >>> rdd1 = sc.parallelize([1,2]) + >>> rdd1 = sc.parallelize([1, 2]) >>> rdd1.setName('RDD1').name() - 'RDD1' + u'RDD1' """ self._jrdd.setName(name) return self @@ -2121,7 +2151,7 @@ class RDD(object): >>> sorted.lookup(1024) [] """ - values = self.filter(lambda (k, v): k == key).values() + values = self.filter(lambda kv: kv[0] == key).values() if self.partitioner is not None: return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)], False) @@ -2159,7 +2189,7 @@ class RDD(object): or meet the confidence. >>> rdd = sc.parallelize(range(1000), 10) - >>> r = sum(xrange(1000)) + >>> r = sum(range(1000)) >>> (rdd.sumApprox(1000) - r) / r < 0.05 True """ @@ -2176,7 +2206,7 @@ class RDD(object): or meet the confidence. >>> rdd = sc.parallelize(range(1000), 10) - >>> r = sum(xrange(1000)) / 1000.0 + >>> r = sum(range(1000)) / 1000.0 >>> (rdd.meanApprox(1000) - r) / r < 0.05 True """ @@ -2201,10 +2231,10 @@ class RDD(object): It must be greater than 0.000017. >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() - >>> 950 < n < 1050 + >>> 900 < n < 1100 True >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct() - >>> 18 < n < 22 + >>> 16 < n < 24 True """ if relativeSD < 0.000017: @@ -2223,8 +2253,7 @@ class RDD(object): >>> [x for x in rdd.toLocalIterator()] [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] """ - partitions = xrange(self.getNumPartitions()) - for partition in partitions: + for partition in range(self.getNumPartitions()): rows = self.context.runJob(self, lambda x: x, [partition]) for row in rows: yield row diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 459e142780..fe8f873248 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -23,7 +23,7 @@ import math class RDDSamplerBase(object): def __init__(self, withReplacement, seed=None): - self._seed = seed if seed is not None else random.randint(0, sys.maxint) + self._seed = seed if seed is not None else random.randint(0, sys.maxsize) self._withReplacement = withReplacement self._random = None @@ -31,7 +31,7 @@ class RDDSamplerBase(object): self._random = random.Random(self._seed ^ split) # mixing because the initial seeds are close to each other - for _ in xrange(10): + for _ in range(10): self._random.randint(0, 1) def getUniformSample(self): diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 4afa82f4b2..d8cdcda3a3 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -49,16 +49,24 @@ which contains two batches of two objects: >>> sc.stop() """ -import cPickle -from itertools import chain, izip, product +import sys +from itertools import chain, product import marshal import struct -import sys import types import collections import zlib import itertools +if sys.version < '3': + import cPickle as pickle + protocol = 2 + from itertools import izip as zip +else: + import pickle + protocol = 3 + xrange = range + from pyspark import cloudpickle @@ -97,7 +105,7 @@ class Serializer(object): # subclasses should override __eq__ as appropriate. def __eq__(self, other): - return isinstance(other, self.__class__) + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ def __ne__(self, other): return not self.__eq__(other) @@ -212,10 +220,6 @@ class BatchedSerializer(Serializer): def _load_stream_without_unbatching(self, stream): return self.serializer.load_stream(stream) - def __eq__(self, other): - return (isinstance(other, BatchedSerializer) and - other.serializer == self.serializer and other.batchSize == self.batchSize) - def __repr__(self): return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize) @@ -233,14 +237,14 @@ class FlattenedValuesSerializer(BatchedSerializer): def _batched(self, iterator): n = self.batchSize for key, values in iterator: - for i in xrange(0, len(values), n): + for i in range(0, len(values), n): yield key, values[i:i + n] def load_stream(self, stream): return self.serializer.load_stream(stream) def __repr__(self): - return "FlattenedValuesSerializer(%d)" % self.batchSize + return "FlattenedValuesSerializer(%s, %d)" % (self.serializer, self.batchSize) class AutoBatchedSerializer(BatchedSerializer): @@ -270,12 +274,8 @@ class AutoBatchedSerializer(BatchedSerializer): elif size > best * 10 and batch > 1: batch /= 2 - def __eq__(self, other): - return (isinstance(other, AutoBatchedSerializer) and - other.serializer == self.serializer and other.bestSize == self.bestSize) - def __repr__(self): - return "AutoBatchedSerializer(%s)" % str(self.serializer) + return "AutoBatchedSerializer(%s)" % self.serializer class CartesianDeserializer(FramedSerializer): @@ -285,6 +285,7 @@ class CartesianDeserializer(FramedSerializer): """ def __init__(self, key_ser, val_ser): + FramedSerializer.__init__(self) self.key_ser = key_ser self.val_ser = val_ser @@ -293,7 +294,7 @@ class CartesianDeserializer(FramedSerializer): val_stream = self.val_ser._load_stream_without_unbatching(stream) key_is_batched = isinstance(self.key_ser, BatchedSerializer) val_is_batched = isinstance(self.val_ser, BatchedSerializer) - for (keys, vals) in izip(key_stream, val_stream): + for (keys, vals) in zip(key_stream, val_stream): keys = keys if key_is_batched else [keys] vals = vals if val_is_batched else [vals] yield (keys, vals) @@ -303,10 +304,6 @@ class CartesianDeserializer(FramedSerializer): for pair in product(keys, vals): yield pair - def __eq__(self, other): - return (isinstance(other, CartesianDeserializer) and - self.key_ser == other.key_ser and self.val_ser == other.val_ser) - def __repr__(self): return "CartesianDeserializer(%s, %s)" % \ (str(self.key_ser), str(self.val_ser)) @@ -318,22 +315,14 @@ class PairDeserializer(CartesianDeserializer): Deserializes the JavaRDD zip() of two PythonRDDs. """ - def __init__(self, key_ser, val_ser): - self.key_ser = key_ser - self.val_ser = val_ser - def load_stream(self, stream): for (keys, vals) in self.prepare_keys_values(stream): if len(keys) != len(vals): raise ValueError("Can not deserialize RDD with different number of items" " in pair: (%d, %d)" % (len(keys), len(vals))) - for pair in izip(keys, vals): + for pair in zip(keys, vals): yield pair - def __eq__(self, other): - return (isinstance(other, PairDeserializer) and - self.key_ser == other.key_ser and self.val_ser == other.val_ser) - def __repr__(self): return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser)) @@ -382,8 +371,8 @@ def _hijack_namedtuple(): global _old_namedtuple # or it will put in closure def _copy_func(f): - return types.FunctionType(f.func_code, f.func_globals, f.func_name, - f.func_defaults, f.func_closure) + return types.FunctionType(f.__code__, f.__globals__, f.__name__, + f.__defaults__, f.__closure__) _old_namedtuple = _copy_func(collections.namedtuple) @@ -392,15 +381,15 @@ def _hijack_namedtuple(): return _hack_namedtuple(cls) # replace namedtuple with new one - collections.namedtuple.func_globals["_old_namedtuple"] = _old_namedtuple - collections.namedtuple.func_globals["_hack_namedtuple"] = _hack_namedtuple - collections.namedtuple.func_code = namedtuple.func_code + collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple + collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple + collections.namedtuple.__code__ = namedtuple.__code__ collections.namedtuple.__hijack = 1 # hack the cls already generated by namedtuple # those created in other module can be pickled as normal, # so only hack those in __main__ module - for n, o in sys.modules["__main__"].__dict__.iteritems(): + for n, o in sys.modules["__main__"].__dict__.items(): if (type(o) is type and o.__base__ is tuple and hasattr(o, "_fields") and "__reduce__" not in o.__dict__): @@ -413,7 +402,7 @@ _hijack_namedtuple() class PickleSerializer(FramedSerializer): """ - Serializes objects using Python's cPickle serializer: + Serializes objects using Python's pickle serializer: http://docs.python.org/2/library/pickle.html @@ -422,10 +411,14 @@ class PickleSerializer(FramedSerializer): """ def dumps(self, obj): - return cPickle.dumps(obj, 2) + return pickle.dumps(obj, protocol) - def loads(self, obj): - return cPickle.loads(obj) + if sys.version >= '3': + def loads(self, obj, encoding="bytes"): + return pickle.loads(obj, encoding=encoding) + else: + def loads(self, obj, encoding=None): + return pickle.loads(obj) class CloudPickleSerializer(PickleSerializer): @@ -454,7 +447,7 @@ class MarshalSerializer(FramedSerializer): class AutoSerializer(FramedSerializer): """ - Choose marshal or cPickle as serialization protocol automatically + Choose marshal or pickle as serialization protocol automatically """ def __init__(self): @@ -463,19 +456,19 @@ class AutoSerializer(FramedSerializer): def dumps(self, obj): if self._type is not None: - return 'P' + cPickle.dumps(obj, -1) + return b'P' + pickle.dumps(obj, -1) try: - return 'M' + marshal.dumps(obj) + return b'M' + marshal.dumps(obj) except Exception: - self._type = 'P' - return 'P' + cPickle.dumps(obj, -1) + self._type = b'P' + return b'P' + pickle.dumps(obj, -1) def loads(self, obj): _type = obj[0] - if _type == 'M': + if _type == b'M': return marshal.loads(obj[1:]) - elif _type == 'P': - return cPickle.loads(obj[1:]) + elif _type == b'P': + return pickle.loads(obj[1:]) else: raise ValueError("invalid sevialization type: %s" % _type) @@ -495,8 +488,8 @@ class CompressedSerializer(FramedSerializer): def loads(self, obj): return self.serializer.loads(zlib.decompress(obj)) - def __eq__(self, other): - return isinstance(other, CompressedSerializer) and self.serializer == other.serializer + def __repr__(self): + return "CompressedSerializer(%s)" % self.serializer class UTF8Deserializer(Serializer): @@ -505,7 +498,7 @@ class UTF8Deserializer(Serializer): Deserializes streams written by String.getBytes. """ - def __init__(self, use_unicode=False): + def __init__(self, use_unicode=True): self.use_unicode = use_unicode def loads(self, stream): @@ -526,13 +519,13 @@ class UTF8Deserializer(Serializer): except EOFError: return - def __eq__(self, other): - return isinstance(other, UTF8Deserializer) and self.use_unicode == other.use_unicode + def __repr__(self): + return "UTF8Deserializer(%s)" % self.use_unicode def read_long(stream): length = stream.read(8) - if length == "": + if not length: raise EOFError return struct.unpack("!q", length)[0] @@ -547,7 +540,7 @@ def pack_long(value): def read_int(stream): length = stream.read(4) - if length == "": + if not length: raise EOFError return struct.unpack("!i", length)[0] diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 81aa970a32..144cdf0b0c 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -21,13 +21,6 @@ An interactive shell. This file is designed to be launched as a PYTHONSTARTUP script. """ -import sys -if sys.version_info[0] != 2: - print("Error: Default Python used is Python%s" % sys.version_info.major) - print("\tSet env variable PYSPARK_PYTHON to Python2 binary and re-run it.") - sys.exit(1) - - import atexit import os import platform @@ -53,9 +46,14 @@ atexit.register(lambda: sc.stop()) try: # Try to access HiveConf, it will raise exception if Hive is not added sc._jvm.org.apache.hadoop.hive.conf.HiveConf() - sqlCtx = sqlContext = HiveContext(sc) + sqlContext = HiveContext(sc) except py4j.protocol.Py4JError: - sqlCtx = sqlContext = SQLContext(sc) + sqlContext = SQLContext(sc) +except TypeError: + sqlContext = SQLContext(sc) + +# for compatibility +sqlCtx = sqlContext print("""Welcome to ____ __ diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 8a6fc627eb..b54baa57ec 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -78,8 +78,8 @@ def _get_local_dirs(sub): # global stats -MemoryBytesSpilled = 0L -DiskBytesSpilled = 0L +MemoryBytesSpilled = 0 +DiskBytesSpilled = 0 class Aggregator(object): @@ -126,7 +126,7 @@ class Merger(object): """ Merge the combined items by mergeCombiner """ raise NotImplementedError - def iteritems(self): + def items(self): """ Return the merged items ad iterator """ raise NotImplementedError @@ -156,9 +156,9 @@ class InMemoryMerger(Merger): for k, v in iterator: d[k] = comb(d[k], v) if k in d else v - def iteritems(self): - """ Return the merged items as iterator """ - return self.data.iteritems() + def items(self): + """ Return the merged items ad iterator """ + return iter(self.data.items()) def _compressed_serializer(self, serializer=None): @@ -208,15 +208,15 @@ class ExternalMerger(Merger): >>> agg = SimpleAggregator(lambda x, y: x + y) >>> merger = ExternalMerger(agg, 10) >>> N = 10000 - >>> merger.mergeValues(zip(xrange(N), xrange(N))) + >>> merger.mergeValues(zip(range(N), range(N))) >>> assert merger.spills > 0 - >>> sum(v for k,v in merger.iteritems()) + >>> sum(v for k,v in merger.items()) 49995000 >>> merger = ExternalMerger(agg, 10) - >>> merger.mergeCombiners(zip(xrange(N), xrange(N))) + >>> merger.mergeCombiners(zip(range(N), range(N))) >>> assert merger.spills > 0 - >>> sum(v for k,v in merger.iteritems()) + >>> sum(v for k,v in merger.items()) 49995000 """ @@ -335,10 +335,10 @@ class ExternalMerger(Merger): # above limit at the first time. # open all the files for writing - streams = [open(os.path.join(path, str(i)), 'w') + streams = [open(os.path.join(path, str(i)), 'wb') for i in range(self.partitions)] - for k, v in self.data.iteritems(): + for k, v in self.data.items(): h = self._partition(k) # put one item in batch, make it compatible with load_stream # it will increase the memory if dump them in batch @@ -354,9 +354,9 @@ class ExternalMerger(Merger): else: for i in range(self.partitions): p = os.path.join(path, str(i)) - with open(p, "w") as f: + with open(p, "wb") as f: # dump items in batch - self.serializer.dump_stream(self.pdata[i].iteritems(), f) + self.serializer.dump_stream(iter(self.pdata[i].items()), f) self.pdata[i].clear() DiskBytesSpilled += os.path.getsize(p) @@ -364,10 +364,10 @@ class ExternalMerger(Merger): gc.collect() # release the memory as much as possible MemoryBytesSpilled += (used_memory - get_used_memory()) << 20 - def iteritems(self): + def items(self): """ Return all merged items as iterator """ if not self.pdata and not self.spills: - return self.data.iteritems() + return iter(self.data.items()) return self._external_items() def _external_items(self): @@ -398,7 +398,8 @@ class ExternalMerger(Merger): path = self._get_spill_dir(j) p = os.path.join(path, str(index)) # do not check memory during merging - self.mergeCombiners(self.serializer.load_stream(open(p)), 0) + with open(p, "rb") as f: + self.mergeCombiners(self.serializer.load_stream(f), 0) # limit the total partitions if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS @@ -408,7 +409,7 @@ class ExternalMerger(Merger): gc.collect() # release the memory as much as possible return self._recursive_merged_items(index) - return self.data.iteritems() + return self.data.items() def _recursive_merged_items(self, index): """ @@ -426,7 +427,8 @@ class ExternalMerger(Merger): for j in range(self.spills): path = self._get_spill_dir(j) p = os.path.join(path, str(index)) - m.mergeCombiners(self.serializer.load_stream(open(p)), 0) + with open(p, 'rb') as f: + m.mergeCombiners(self.serializer.load_stream(f), 0) if get_used_memory() > limit: m._spill() @@ -451,7 +453,7 @@ class ExternalSorter(object): >>> sorter = ExternalSorter(1) # 1M >>> import random - >>> l = range(1024) + >>> l = list(range(1024)) >>> random.shuffle(l) >>> sorted(l) == list(sorter.sorted(l)) True @@ -499,9 +501,16 @@ class ExternalSorter(object): # sort them inplace will save memory current_chunk.sort(key=key, reverse=reverse) path = self._get_path(len(chunks)) - with open(path, 'w') as f: + with open(path, 'wb') as f: self.serializer.dump_stream(current_chunk, f) - chunks.append(self.serializer.load_stream(open(path))) + + def load(f): + for v in self.serializer.load_stream(f): + yield v + # close the file explicit once we consume all the items + # to avoid ResourceWarning in Python3 + f.close() + chunks.append(load(open(path, 'rb'))) current_chunk = [] gc.collect() limit = self._next_limit() @@ -527,7 +536,7 @@ class ExternalList(object): ExternalList can have many items which cannot be hold in memory in the same time. - >>> l = ExternalList(range(100)) + >>> l = ExternalList(list(range(100))) >>> len(l) 100 >>> l.append(10) @@ -555,11 +564,11 @@ class ExternalList(object): def __getstate__(self): if self._file is not None: self._file.flush() - f = os.fdopen(os.dup(self._file.fileno())) - f.seek(0) - serialized = f.read() + with os.fdopen(os.dup(self._file.fileno()), "rb") as f: + f.seek(0) + serialized = f.read() else: - serialized = '' + serialized = b'' return self.values, self.count, serialized def __setstate__(self, item): @@ -575,7 +584,7 @@ class ExternalList(object): if self._file is not None: self._file.flush() # read all items from disks first - with os.fdopen(os.dup(self._file.fileno()), 'r') as f: + with os.fdopen(os.dup(self._file.fileno()), 'rb') as f: f.seek(0) for v in self._ser.load_stream(f): yield v @@ -598,11 +607,16 @@ class ExternalList(object): d = dirs[id(self) % len(dirs)] if not os.path.exists(d): os.makedirs(d) - p = os.path.join(d, str(id)) - self._file = open(p, "w+", 65536) + p = os.path.join(d, str(id(self))) + self._file = open(p, "wb+", 65536) self._ser = BatchedSerializer(CompressedSerializer(PickleSerializer()), 1024) os.unlink(p) + def __del__(self): + if self._file: + self._file.close() + self._file = None + def _spill(self): """ dump the values into disk """ global MemoryBytesSpilled, DiskBytesSpilled @@ -651,33 +665,28 @@ class GroupByKey(object): """ Group a sorted iterator as [(k1, it1), (k2, it2), ...] - >>> k = [i/3 for i in range(6)] + >>> k = [i // 3 for i in range(6)] >>> v = [[i] for i in range(6)] - >>> g = GroupByKey(iter(zip(k, v))) + >>> g = GroupByKey(zip(k, v)) >>> [(k, list(it)) for k, it in g] [(0, [0, 1, 2]), (1, [3, 4, 5])] """ def __init__(self, iterator): - self.iterator = iter(iterator) - self.next_item = None + self.iterator = iterator def __iter__(self): - return self - - def next(self): - key, value = self.next_item if self.next_item else next(self.iterator) - values = ExternalListOfList([value]) - try: - while True: - k, v = next(self.iterator) - if k != key: - self.next_item = (k, v) - break + key, values = None, None + for k, v in self.iterator: + if values is not None and k == key: values.append(v) - except StopIteration: - self.next_item = None - return key, values + else: + if values is not None: + yield (key, values) + key = k + values = ExternalListOfList([v]) + if values is not None: + yield (key, values) class ExternalGroupBy(ExternalMerger): @@ -744,7 +753,7 @@ class ExternalGroupBy(ExternalMerger): # above limit at the first time. # open all the files for writing - streams = [open(os.path.join(path, str(i)), 'w') + streams = [open(os.path.join(path, str(i)), 'wb') for i in range(self.partitions)] # If the number of keys is small, then the overhead of sort is small @@ -756,7 +765,7 @@ class ExternalGroupBy(ExternalMerger): h = self._partition(k) self.serializer.dump_stream([(k, self.data[k])], streams[h]) else: - for k, v in self.data.iteritems(): + for k, v in self.data.items(): h = self._partition(k) self.serializer.dump_stream([(k, v)], streams[h]) @@ -771,14 +780,14 @@ class ExternalGroupBy(ExternalMerger): else: for i in range(self.partitions): p = os.path.join(path, str(i)) - with open(p, "w") as f: + with open(p, "wb") as f: # dump items in batch if self._sorted: # sort by key only (stable) - sorted_items = sorted(self.pdata[i].iteritems(), key=operator.itemgetter(0)) + sorted_items = sorted(self.pdata[i].items(), key=operator.itemgetter(0)) self.serializer.dump_stream(sorted_items, f) else: - self.serializer.dump_stream(self.pdata[i].iteritems(), f) + self.serializer.dump_stream(self.pdata[i].items(), f) self.pdata[i].clear() DiskBytesSpilled += os.path.getsize(p) @@ -792,7 +801,7 @@ class ExternalGroupBy(ExternalMerger): # if the memory can not hold all the partition, # then use sort based merge. Because of compression, # the data on disks will be much smaller than needed memory - if (size >> 20) >= self.memory_limit / 10: + if size >= self.memory_limit << 17: # * 1M / 8 return self._merge_sorted_items(index) self.data = {} @@ -800,15 +809,18 @@ class ExternalGroupBy(ExternalMerger): path = self._get_spill_dir(j) p = os.path.join(path, str(index)) # do not check memory during merging - self.mergeCombiners(self.serializer.load_stream(open(p)), 0) - return self.data.iteritems() + with open(p, "rb") as f: + self.mergeCombiners(self.serializer.load_stream(f), 0) + return self.data.items() def _merge_sorted_items(self, index): """ load a partition from disk, then sort and group by key """ def load_partition(j): path = self._get_spill_dir(j) p = os.path.join(path, str(index)) - return self.serializer.load_stream(open(p, 'r', 65536)) + with open(p, 'rb', 65536) as f: + for v in self.serializer.load_stream(f): + yield v disk_items = [load_partition(j) for j in range(self.spills)] diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 65abb24eed..6d54b9e49e 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -37,9 +37,22 @@ Important classes of Spark SQL and DataFrames: - L{types} List of data types available. """ +from __future__ import absolute_import + +# fix the module name conflict for Python 3+ +import sys +from . import _types as types +modname = __name__ + '.types' +types.__name__ = modname +# update the __module__ for all objects, make them picklable +for v in types.__dict__.values(): + if hasattr(v, "__module__") and v.__module__.endswith('._types'): + v.__module__ = modname +sys.modules[modname] = types +del modname, sys -from pyspark.sql.context import SQLContext, HiveContext from pyspark.sql.types import Row +from pyspark.sql.context import SQLContext, HiveContext from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD, DataFrameNaFunctions __all__ = [ diff --git a/python/pyspark/sql/_types.py b/python/pyspark/sql/_types.py new file mode 100644 index 0000000000..492c0cbdcf --- /dev/null +++ b/python/pyspark/sql/_types.py @@ -0,0 +1,1261 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +import decimal +import datetime +import keyword +import warnings +import json +import re +import weakref +from array import array +from operator import itemgetter + +if sys.version >= "3": + long = int + unicode = str + +__all__ = [ + "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType", + "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType", + "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"] + + +class DataType(object): + """Base class for data types.""" + + def __repr__(self): + return self.__class__.__name__ + + def __hash__(self): + return hash(str(self)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self.__eq__(other) + + @classmethod + def typeName(cls): + return cls.__name__[:-4].lower() + + def simpleString(self): + return self.typeName() + + def jsonValue(self): + return self.typeName() + + def json(self): + return json.dumps(self.jsonValue(), + separators=(',', ':'), + sort_keys=True) + + +# This singleton pattern does not work with pickle, you will get +# another object after pickle and unpickle +class PrimitiveTypeSingleton(type): + """Metaclass for PrimitiveType""" + + _instances = {} + + def __call__(cls): + if cls not in cls._instances: + cls._instances[cls] = super(PrimitiveTypeSingleton, cls).__call__() + return cls._instances[cls] + + +class PrimitiveType(DataType): + """Spark SQL PrimitiveType""" + + __metaclass__ = PrimitiveTypeSingleton + + +class NullType(PrimitiveType): + """Null type. + + The data type representing None, used for the types that cannot be inferred. + """ + + +class StringType(PrimitiveType): + """String data type. + """ + + +class BinaryType(PrimitiveType): + """Binary (byte array) data type. + """ + + +class BooleanType(PrimitiveType): + """Boolean data type. + """ + + +class DateType(PrimitiveType): + """Date (datetime.date) data type. + """ + + +class TimestampType(PrimitiveType): + """Timestamp (datetime.datetime) data type. + """ + + +class DecimalType(DataType): + """Decimal (decimal.Decimal) data type. + """ + + def __init__(self, precision=None, scale=None): + self.precision = precision + self.scale = scale + self.hasPrecisionInfo = precision is not None + + def simpleString(self): + if self.hasPrecisionInfo: + return "decimal(%d,%d)" % (self.precision, self.scale) + else: + return "decimal(10,0)" + + def jsonValue(self): + if self.hasPrecisionInfo: + return "decimal(%d,%d)" % (self.precision, self.scale) + else: + return "decimal" + + def __repr__(self): + if self.hasPrecisionInfo: + return "DecimalType(%d,%d)" % (self.precision, self.scale) + else: + return "DecimalType()" + + +class DoubleType(PrimitiveType): + """Double data type, representing double precision floats. + """ + + +class FloatType(PrimitiveType): + """Float data type, representing single precision floats. + """ + + +class ByteType(PrimitiveType): + """Byte data type, i.e. a signed integer in a single byte. + """ + def simpleString(self): + return 'tinyint' + + +class IntegerType(PrimitiveType): + """Int data type, i.e. a signed 32-bit integer. + """ + def simpleString(self): + return 'int' + + +class LongType(PrimitiveType): + """Long data type, i.e. a signed 64-bit integer. + + If the values are beyond the range of [-9223372036854775808, 9223372036854775807], + please use :class:`DecimalType`. + """ + def simpleString(self): + return 'bigint' + + +class ShortType(PrimitiveType): + """Short data type, i.e. a signed 16-bit integer. + """ + def simpleString(self): + return 'smallint' + + +class ArrayType(DataType): + """Array data type. + + :param elementType: :class:`DataType` of each element in the array. + :param containsNull: boolean, whether the array can contain null (None) values. + """ + + def __init__(self, elementType, containsNull=True): + """ + >>> ArrayType(StringType()) == ArrayType(StringType(), True) + True + >>> ArrayType(StringType(), False) == ArrayType(StringType()) + False + """ + assert isinstance(elementType, DataType), "elementType should be DataType" + self.elementType = elementType + self.containsNull = containsNull + + def simpleString(self): + return 'array<%s>' % self.elementType.simpleString() + + def __repr__(self): + return "ArrayType(%s,%s)" % (self.elementType, + str(self.containsNull).lower()) + + def jsonValue(self): + return {"type": self.typeName(), + "elementType": self.elementType.jsonValue(), + "containsNull": self.containsNull} + + @classmethod + def fromJson(cls, json): + return ArrayType(_parse_datatype_json_value(json["elementType"]), + json["containsNull"]) + + +class MapType(DataType): + """Map data type. + + :param keyType: :class:`DataType` of the keys in the map. + :param valueType: :class:`DataType` of the values in the map. + :param valueContainsNull: indicates whether values can contain null (None) values. + + Keys in a map data type are not allowed to be null (None). + """ + + def __init__(self, keyType, valueType, valueContainsNull=True): + """ + >>> (MapType(StringType(), IntegerType()) + ... == MapType(StringType(), IntegerType(), True)) + True + >>> (MapType(StringType(), IntegerType(), False) + ... == MapType(StringType(), FloatType())) + False + """ + assert isinstance(keyType, DataType), "keyType should be DataType" + assert isinstance(valueType, DataType), "valueType should be DataType" + self.keyType = keyType + self.valueType = valueType + self.valueContainsNull = valueContainsNull + + def simpleString(self): + return 'map<%s,%s>' % (self.keyType.simpleString(), self.valueType.simpleString()) + + def __repr__(self): + return "MapType(%s,%s,%s)" % (self.keyType, self.valueType, + str(self.valueContainsNull).lower()) + + def jsonValue(self): + return {"type": self.typeName(), + "keyType": self.keyType.jsonValue(), + "valueType": self.valueType.jsonValue(), + "valueContainsNull": self.valueContainsNull} + + @classmethod + def fromJson(cls, json): + return MapType(_parse_datatype_json_value(json["keyType"]), + _parse_datatype_json_value(json["valueType"]), + json["valueContainsNull"]) + + +class StructField(DataType): + """A field in :class:`StructType`. + + :param name: string, name of the field. + :param dataType: :class:`DataType` of the field. + :param nullable: boolean, whether the field can be null (None) or not. + :param metadata: a dict from string to simple type that can be serialized to JSON automatically + """ + + def __init__(self, name, dataType, nullable=True, metadata=None): + """ + >>> (StructField("f1", StringType(), True) + ... == StructField("f1", StringType(), True)) + True + >>> (StructField("f1", StringType(), True) + ... == StructField("f2", StringType(), True)) + False + """ + assert isinstance(dataType, DataType), "dataType should be DataType" + self.name = name + self.dataType = dataType + self.nullable = nullable + self.metadata = metadata or {} + + def simpleString(self): + return '%s:%s' % (self.name, self.dataType.simpleString()) + + def __repr__(self): + return "StructField(%s,%s,%s)" % (self.name, self.dataType, + str(self.nullable).lower()) + + def jsonValue(self): + return {"name": self.name, + "type": self.dataType.jsonValue(), + "nullable": self.nullable, + "metadata": self.metadata} + + @classmethod + def fromJson(cls, json): + return StructField(json["name"], + _parse_datatype_json_value(json["type"]), + json["nullable"], + json["metadata"]) + + +class StructType(DataType): + """Struct type, consisting of a list of :class:`StructField`. + + This is the data type representing a :class:`Row`. + """ + + def __init__(self, fields): + """ + >>> struct1 = StructType([StructField("f1", StringType(), True)]) + >>> struct2 = StructType([StructField("f1", StringType(), True)]) + >>> struct1 == struct2 + True + >>> struct1 = StructType([StructField("f1", StringType(), True)]) + >>> struct2 = StructType([StructField("f1", StringType(), True), + ... StructField("f2", IntegerType(), False)]) + >>> struct1 == struct2 + False + """ + assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType" + self.fields = fields + + def simpleString(self): + return 'struct<%s>' % (','.join(f.simpleString() for f in self.fields)) + + def __repr__(self): + return ("StructType(List(%s))" % + ",".join(str(field) for field in self.fields)) + + def jsonValue(self): + return {"type": self.typeName(), + "fields": [f.jsonValue() for f in self.fields]} + + @classmethod + def fromJson(cls, json): + return StructType([StructField.fromJson(f) for f in json["fields"]]) + + +class UserDefinedType(DataType): + """User-defined type (UDT). + + .. note:: WARN: Spark Internal Use Only + """ + + @classmethod + def typeName(cls): + return cls.__name__.lower() + + @classmethod + def sqlType(cls): + """ + Underlying SQL storage type for this UDT. + """ + raise NotImplementedError("UDT must implement sqlType().") + + @classmethod + def module(cls): + """ + The Python module of the UDT. + """ + raise NotImplementedError("UDT must implement module().") + + @classmethod + def scalaUDT(cls): + """ + The class name of the paired Scala UDT. + """ + raise NotImplementedError("UDT must have a paired Scala UDT.") + + def serialize(self, obj): + """ + Converts the a user-type object into a SQL datum. + """ + raise NotImplementedError("UDT must implement serialize().") + + def deserialize(self, datum): + """ + Converts a SQL datum into a user-type object. + """ + raise NotImplementedError("UDT must implement deserialize().") + + def simpleString(self): + return 'udt' + + def json(self): + return json.dumps(self.jsonValue(), separators=(',', ':'), sort_keys=True) + + def jsonValue(self): + schema = { + "type": "udt", + "class": self.scalaUDT(), + "pyClass": "%s.%s" % (self.module(), type(self).__name__), + "sqlType": self.sqlType().jsonValue() + } + return schema + + @classmethod + def fromJson(cls, json): + pyUDT = json["pyClass"] + split = pyUDT.rfind(".") + pyModule = pyUDT[:split] + pyClass = pyUDT[split+1:] + m = __import__(pyModule, globals(), locals(), [pyClass]) + UDT = getattr(m, pyClass) + return UDT() + + def __eq__(self, other): + return type(self) == type(other) + + +_all_primitive_types = dict((v.typeName(), v) + for v in list(globals().values()) + if (type(v) is type or type(v) is PrimitiveTypeSingleton) + and v.__base__ == PrimitiveType) + +_all_complex_types = dict((v.typeName(), v) + for v in [ArrayType, MapType, StructType]) + + +def _parse_datatype_json_string(json_string): + """Parses the given data type JSON string. + >>> import pickle + >>> def check_datatype(datatype): + ... pickled = pickle.loads(pickle.dumps(datatype)) + ... assert datatype == pickled + ... scala_datatype = sqlContext._ssql_ctx.parseDataType(datatype.json()) + ... python_datatype = _parse_datatype_json_string(scala_datatype.json()) + ... assert datatype == python_datatype + >>> for cls in _all_primitive_types.values(): + ... check_datatype(cls()) + + >>> # Simple ArrayType. + >>> simple_arraytype = ArrayType(StringType(), True) + >>> check_datatype(simple_arraytype) + + >>> # Simple MapType. + >>> simple_maptype = MapType(StringType(), LongType()) + >>> check_datatype(simple_maptype) + + >>> # Simple StructType. + >>> simple_structtype = StructType([ + ... StructField("a", DecimalType(), False), + ... StructField("b", BooleanType(), True), + ... StructField("c", LongType(), True), + ... StructField("d", BinaryType(), False)]) + >>> check_datatype(simple_structtype) + + >>> # Complex StructType. + >>> complex_structtype = StructType([ + ... StructField("simpleArray", simple_arraytype, True), + ... StructField("simpleMap", simple_maptype, True), + ... StructField("simpleStruct", simple_structtype, True), + ... StructField("boolean", BooleanType(), False), + ... StructField("withMeta", DoubleType(), False, {"name": "age"})]) + >>> check_datatype(complex_structtype) + + >>> # Complex ArrayType. + >>> complex_arraytype = ArrayType(complex_structtype, True) + >>> check_datatype(complex_arraytype) + + >>> # Complex MapType. + >>> complex_maptype = MapType(complex_structtype, + ... complex_arraytype, False) + >>> check_datatype(complex_maptype) + + >>> check_datatype(ExamplePointUDT()) + >>> structtype_with_udt = StructType([StructField("label", DoubleType(), False), + ... StructField("point", ExamplePointUDT(), False)]) + >>> check_datatype(structtype_with_udt) + """ + return _parse_datatype_json_value(json.loads(json_string)) + + +_FIXED_DECIMAL = re.compile("decimal\\((\\d+),(\\d+)\\)") + + +def _parse_datatype_json_value(json_value): + if not isinstance(json_value, dict): + if json_value in _all_primitive_types.keys(): + return _all_primitive_types[json_value]() + elif json_value == 'decimal': + return DecimalType() + elif _FIXED_DECIMAL.match(json_value): + m = _FIXED_DECIMAL.match(json_value) + return DecimalType(int(m.group(1)), int(m.group(2))) + else: + raise ValueError("Could not parse datatype: %s" % json_value) + else: + tpe = json_value["type"] + if tpe in _all_complex_types: + return _all_complex_types[tpe].fromJson(json_value) + elif tpe == 'udt': + return UserDefinedType.fromJson(json_value) + else: + raise ValueError("not supported type: %s" % tpe) + + +# Mapping Python types to Spark SQL DataType +_type_mappings = { + type(None): NullType, + bool: BooleanType, + int: LongType, + float: DoubleType, + str: StringType, + bytearray: BinaryType, + decimal.Decimal: DecimalType, + datetime.date: DateType, + datetime.datetime: TimestampType, + datetime.time: TimestampType, +} + +if sys.version < "3": + _type_mappings.update({ + unicode: StringType, + long: LongType, + }) + + +def _infer_type(obj): + """Infer the DataType from obj + + >>> p = ExamplePoint(1.0, 2.0) + >>> _infer_type(p) + ExamplePointUDT + """ + if obj is None: + return NullType() + + if hasattr(obj, '__UDT__'): + return obj.__UDT__ + + dataType = _type_mappings.get(type(obj)) + if dataType is not None: + return dataType() + + if isinstance(obj, dict): + for key, value in obj.items(): + if key is not None and value is not None: + return MapType(_infer_type(key), _infer_type(value), True) + else: + return MapType(NullType(), NullType(), True) + elif isinstance(obj, (list, array)): + for v in obj: + if v is not None: + return ArrayType(_infer_type(obj[0]), True) + else: + return ArrayType(NullType(), True) + else: + try: + return _infer_schema(obj) + except ValueError: + raise ValueError("not supported type: %s" % type(obj)) + + +def _infer_schema(row): + """Infer the schema from dict/namedtuple/object""" + if isinstance(row, dict): + items = sorted(row.items()) + + elif isinstance(row, (tuple, list)): + if hasattr(row, "__fields__"): # Row + items = zip(row.__fields__, tuple(row)) + elif hasattr(row, "_fields"): # namedtuple + items = zip(row._fields, tuple(row)) + else: + names = ['_%d' % i for i in range(1, len(row) + 1)] + items = zip(names, row) + + elif hasattr(row, "__dict__"): # object + items = sorted(row.__dict__.items()) + + else: + raise ValueError("Can not infer schema for type: %s" % type(row)) + + fields = [StructField(k, _infer_type(v), True) for k, v in items] + return StructType(fields) + + +def _need_python_to_sql_conversion(dataType): + """ + Checks whether we need python to sql conversion for the given type. + For now, only UDTs need this conversion. + + >>> _need_python_to_sql_conversion(DoubleType()) + False + >>> schema0 = StructType([StructField("indices", ArrayType(IntegerType(), False), False), + ... StructField("values", ArrayType(DoubleType(), False), False)]) + >>> _need_python_to_sql_conversion(schema0) + False + >>> _need_python_to_sql_conversion(ExamplePointUDT()) + True + >>> schema1 = ArrayType(ExamplePointUDT(), False) + >>> _need_python_to_sql_conversion(schema1) + True + >>> schema2 = StructType([StructField("label", DoubleType(), False), + ... StructField("point", ExamplePointUDT(), False)]) + >>> _need_python_to_sql_conversion(schema2) + True + """ + if isinstance(dataType, StructType): + return any([_need_python_to_sql_conversion(f.dataType) for f in dataType.fields]) + elif isinstance(dataType, ArrayType): + return _need_python_to_sql_conversion(dataType.elementType) + elif isinstance(dataType, MapType): + return _need_python_to_sql_conversion(dataType.keyType) or \ + _need_python_to_sql_conversion(dataType.valueType) + elif isinstance(dataType, UserDefinedType): + return True + else: + return False + + +def _python_to_sql_converter(dataType): + """ + Returns a converter that converts a Python object into a SQL datum for the given type. + + >>> conv = _python_to_sql_converter(DoubleType()) + >>> conv(1.0) + 1.0 + >>> conv = _python_to_sql_converter(ArrayType(DoubleType(), False)) + >>> conv([1.0, 2.0]) + [1.0, 2.0] + >>> conv = _python_to_sql_converter(ExamplePointUDT()) + >>> conv(ExamplePoint(1.0, 2.0)) + [1.0, 2.0] + >>> schema = StructType([StructField("label", DoubleType(), False), + ... StructField("point", ExamplePointUDT(), False)]) + >>> conv = _python_to_sql_converter(schema) + >>> conv((1.0, ExamplePoint(1.0, 2.0))) + (1.0, [1.0, 2.0]) + """ + if not _need_python_to_sql_conversion(dataType): + return lambda x: x + + if isinstance(dataType, StructType): + names, types = zip(*[(f.name, f.dataType) for f in dataType.fields]) + converters = map(_python_to_sql_converter, types) + + def converter(obj): + if isinstance(obj, dict): + return tuple(c(obj.get(n)) for n, c in zip(names, converters)) + elif isinstance(obj, tuple): + if hasattr(obj, "__fields__") or hasattr(obj, "_fields"): + return tuple(c(v) for c, v in zip(converters, obj)) + elif all(isinstance(x, tuple) and len(x) == 2 for x in obj): # k-v pairs + d = dict(obj) + return tuple(c(d.get(n)) for n, c in zip(names, converters)) + else: + return tuple(c(v) for c, v in zip(converters, obj)) + else: + raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType)) + return converter + elif isinstance(dataType, ArrayType): + element_converter = _python_to_sql_converter(dataType.elementType) + return lambda a: [element_converter(v) for v in a] + elif isinstance(dataType, MapType): + key_converter = _python_to_sql_converter(dataType.keyType) + value_converter = _python_to_sql_converter(dataType.valueType) + return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) + elif isinstance(dataType, UserDefinedType): + return lambda obj: dataType.serialize(obj) + else: + raise ValueError("Unexpected type %r" % dataType) + + +def _has_nulltype(dt): + """ Return whether there is NullType in `dt` or not """ + if isinstance(dt, StructType): + return any(_has_nulltype(f.dataType) for f in dt.fields) + elif isinstance(dt, ArrayType): + return _has_nulltype((dt.elementType)) + elif isinstance(dt, MapType): + return _has_nulltype(dt.keyType) or _has_nulltype(dt.valueType) + else: + return isinstance(dt, NullType) + + +def _merge_type(a, b): + if isinstance(a, NullType): + return b + elif isinstance(b, NullType): + return a + elif type(a) is not type(b): + # TODO: type cast (such as int -> long) + raise TypeError("Can not merge type %s and %s" % (a, b)) + + # same type + if isinstance(a, StructType): + nfs = dict((f.name, f.dataType) for f in b.fields) + fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()))) + for f in a.fields] + names = set([f.name for f in fields]) + for n in nfs: + if n not in names: + fields.append(StructField(n, nfs[n])) + return StructType(fields) + + elif isinstance(a, ArrayType): + return ArrayType(_merge_type(a.elementType, b.elementType), True) + + elif isinstance(a, MapType): + return MapType(_merge_type(a.keyType, b.keyType), + _merge_type(a.valueType, b.valueType), + True) + else: + return a + + +def _need_converter(dataType): + if isinstance(dataType, StructType): + return True + elif isinstance(dataType, ArrayType): + return _need_converter(dataType.elementType) + elif isinstance(dataType, MapType): + return _need_converter(dataType.keyType) or _need_converter(dataType.valueType) + elif isinstance(dataType, NullType): + return True + else: + return False + + +def _create_converter(dataType): + """Create an converter to drop the names of fields in obj """ + if not _need_converter(dataType): + return lambda x: x + + if isinstance(dataType, ArrayType): + conv = _create_converter(dataType.elementType) + return lambda row: [conv(v) for v in row] + + elif isinstance(dataType, MapType): + kconv = _create_converter(dataType.keyType) + vconv = _create_converter(dataType.valueType) + return lambda row: dict((kconv(k), vconv(v)) for k, v in row.items()) + + elif isinstance(dataType, NullType): + return lambda x: None + + elif not isinstance(dataType, StructType): + return lambda x: x + + # dataType must be StructType + names = [f.name for f in dataType.fields] + converters = [_create_converter(f.dataType) for f in dataType.fields] + convert_fields = any(_need_converter(f.dataType) for f in dataType.fields) + + def convert_struct(obj): + if obj is None: + return + + if isinstance(obj, (tuple, list)): + if convert_fields: + return tuple(conv(v) for v, conv in zip(obj, converters)) + else: + return tuple(obj) + + if isinstance(obj, dict): + d = obj + elif hasattr(obj, "__dict__"): # object + d = obj.__dict__ + else: + raise ValueError("Unexpected obj: %s" % obj) + + if convert_fields: + return tuple([conv(d.get(name)) for name, conv in zip(names, converters)]) + else: + return tuple([d.get(name) for name in names]) + + return convert_struct + + +_BRACKETS = {'(': ')', '[': ']', '{': '}'} + + +def _split_schema_abstract(s): + """ + split the schema abstract into fields + + >>> _split_schema_abstract("a b c") + ['a', 'b', 'c'] + >>> _split_schema_abstract("a(a b)") + ['a(a b)'] + >>> _split_schema_abstract("a b[] c{a b}") + ['a', 'b[]', 'c{a b}'] + >>> _split_schema_abstract(" ") + [] + """ + + r = [] + w = '' + brackets = [] + for c in s: + if c == ' ' and not brackets: + if w: + r.append(w) + w = '' + else: + w += c + if c in _BRACKETS: + brackets.append(c) + elif c in _BRACKETS.values(): + if not brackets or c != _BRACKETS[brackets.pop()]: + raise ValueError("unexpected " + c) + + if brackets: + raise ValueError("brackets not closed: %s" % brackets) + if w: + r.append(w) + return r + + +def _parse_field_abstract(s): + """ + Parse a field in schema abstract + + >>> _parse_field_abstract("a") + StructField(a,NullType,true) + >>> _parse_field_abstract("b(c d)") + StructField(b,StructType(...c,NullType,true),StructField(d... + >>> _parse_field_abstract("a[]") + StructField(a,ArrayType(NullType,true),true) + >>> _parse_field_abstract("a{[]}") + StructField(a,MapType(NullType,ArrayType(NullType,true),true),true) + """ + if set(_BRACKETS.keys()) & set(s): + idx = min((s.index(c) for c in _BRACKETS if c in s)) + name = s[:idx] + return StructField(name, _parse_schema_abstract(s[idx:]), True) + else: + return StructField(s, NullType(), True) + + +def _parse_schema_abstract(s): + """ + parse abstract into schema + + >>> _parse_schema_abstract("a b c") + StructType...a...b...c... + >>> _parse_schema_abstract("a[b c] b{}") + StructType...a,ArrayType...b...c...b,MapType... + >>> _parse_schema_abstract("c{} d{a b}") + StructType...c,MapType...d,MapType...a...b... + >>> _parse_schema_abstract("a b(t)").fields[1] + StructField(b,StructType(List(StructField(t,NullType,true))),true) + """ + s = s.strip() + if not s: + return NullType() + + elif s.startswith('('): + return _parse_schema_abstract(s[1:-1]) + + elif s.startswith('['): + return ArrayType(_parse_schema_abstract(s[1:-1]), True) + + elif s.startswith('{'): + return MapType(NullType(), _parse_schema_abstract(s[1:-1])) + + parts = _split_schema_abstract(s) + fields = [_parse_field_abstract(p) for p in parts] + return StructType(fields) + + +def _infer_schema_type(obj, dataType): + """ + Fill the dataType with types inferred from obj + + >>> schema = _parse_schema_abstract("a b c d") + >>> row = (1, 1.0, "str", datetime.date(2014, 10, 10)) + >>> _infer_schema_type(row, schema) + StructType...LongType...DoubleType...StringType...DateType... + >>> row = [[1], {"key": (1, 2.0)}] + >>> schema = _parse_schema_abstract("a[] b{c d}") + >>> _infer_schema_type(row, schema) + StructType...a,ArrayType...b,MapType(StringType,...c,LongType... + """ + if isinstance(dataType, NullType): + return _infer_type(obj) + + if not obj: + return NullType() + + if isinstance(dataType, ArrayType): + eType = _infer_schema_type(obj[0], dataType.elementType) + return ArrayType(eType, True) + + elif isinstance(dataType, MapType): + k, v = next(iter(obj.items())) + return MapType(_infer_schema_type(k, dataType.keyType), + _infer_schema_type(v, dataType.valueType)) + + elif isinstance(dataType, StructType): + fs = dataType.fields + assert len(fs) == len(obj), \ + "Obj(%s) have different length with fields(%s)" % (obj, fs) + fields = [StructField(f.name, _infer_schema_type(o, f.dataType), True) + for o, f in zip(obj, fs)] + return StructType(fields) + + else: + raise ValueError("Unexpected dataType: %s" % dataType) + + +_acceptable_types = { + BooleanType: (bool,), + ByteType: (int, long), + ShortType: (int, long), + IntegerType: (int, long), + LongType: (int, long), + FloatType: (float,), + DoubleType: (float,), + DecimalType: (decimal.Decimal,), + StringType: (str, unicode), + BinaryType: (bytearray,), + DateType: (datetime.date,), + TimestampType: (datetime.datetime,), + ArrayType: (list, tuple, array), + MapType: (dict,), + StructType: (tuple, list), +} + + +def _verify_type(obj, dataType): + """ + Verify the type of obj against dataType, raise an exception if + they do not match. + + >>> _verify_type(None, StructType([])) + >>> _verify_type("", StringType()) + >>> _verify_type(0, LongType()) + >>> _verify_type(list(range(3)), ArrayType(ShortType())) + >>> _verify_type(set(), ArrayType(StringType())) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + TypeError:... + >>> _verify_type({}, MapType(StringType(), IntegerType())) + >>> _verify_type((), StructType([])) + >>> _verify_type([], StructType([])) + >>> _verify_type([1], StructType([])) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + ValueError:... + >>> _verify_type(ExamplePoint(1.0, 2.0), ExamplePointUDT()) + >>> _verify_type([1.0, 2.0], ExamplePointUDT()) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + ValueError:... + """ + # all objects are nullable + if obj is None: + return + + if isinstance(dataType, UserDefinedType): + if not (hasattr(obj, '__UDT__') and obj.__UDT__ == dataType): + raise ValueError("%r is not an instance of type %r" % (obj, dataType)) + _verify_type(dataType.serialize(obj), dataType.sqlType()) + return + + _type = type(dataType) + assert _type in _acceptable_types, "unknown datatype: %s" % dataType + + # subclass of them can not be deserialized in JVM + if type(obj) not in _acceptable_types[_type]: + raise TypeError("%s can not accept object in type %s" + % (dataType, type(obj))) + + if isinstance(dataType, ArrayType): + for i in obj: + _verify_type(i, dataType.elementType) + + elif isinstance(dataType, MapType): + for k, v in obj.items(): + _verify_type(k, dataType.keyType) + _verify_type(v, dataType.valueType) + + elif isinstance(dataType, StructType): + if len(obj) != len(dataType.fields): + raise ValueError("Length of object (%d) does not match with " + "length of fields (%d)" % (len(obj), len(dataType.fields))) + for v, f in zip(obj, dataType.fields): + _verify_type(v, f.dataType) + +_cached_cls = weakref.WeakValueDictionary() + + +def _restore_object(dataType, obj): + """ Restore object during unpickling. """ + # use id(dataType) as key to speed up lookup in dict + # Because of batched pickling, dataType will be the + # same object in most cases. + k = id(dataType) + cls = _cached_cls.get(k) + if cls is None or cls.__datatype is not dataType: + # use dataType as key to avoid create multiple class + cls = _cached_cls.get(dataType) + if cls is None: + cls = _create_cls(dataType) + _cached_cls[dataType] = cls + cls.__datatype = dataType + _cached_cls[k] = cls + return cls(obj) + + +def _create_object(cls, v): + """ Create an customized object with class `cls`. """ + # datetime.date would be deserialized as datetime.datetime + # from java type, so we need to set it back. + if cls is datetime.date and isinstance(v, datetime.datetime): + return v.date() + return cls(v) if v is not None else v + + +def _create_getter(dt, i): + """ Create a getter for item `i` with schema """ + cls = _create_cls(dt) + + def getter(self): + return _create_object(cls, self[i]) + + return getter + + +def _has_struct_or_date(dt): + """Return whether `dt` is or has StructType/DateType in it""" + if isinstance(dt, StructType): + return True + elif isinstance(dt, ArrayType): + return _has_struct_or_date(dt.elementType) + elif isinstance(dt, MapType): + return _has_struct_or_date(dt.keyType) or _has_struct_or_date(dt.valueType) + elif isinstance(dt, DateType): + return True + elif isinstance(dt, UserDefinedType): + return True + return False + + +def _create_properties(fields): + """Create properties according to fields""" + ps = {} + for i, f in enumerate(fields): + name = f.name + if (name.startswith("__") and name.endswith("__") + or keyword.iskeyword(name)): + warnings.warn("field name %s can not be accessed in Python," + "use position to access it instead" % name) + if _has_struct_or_date(f.dataType): + # delay creating object until accessing it + getter = _create_getter(f.dataType, i) + else: + getter = itemgetter(i) + ps[name] = property(getter) + return ps + + +def _create_cls(dataType): + """ + Create an class by dataType + + The created class is similar to namedtuple, but can have nested schema. + + >>> schema = _parse_schema_abstract("a b c") + >>> row = (1, 1.0, "str") + >>> schema = _infer_schema_type(row, schema) + >>> obj = _create_cls(schema)(row) + >>> import pickle + >>> pickle.loads(pickle.dumps(obj)) + Row(a=1, b=1.0, c='str') + + >>> row = [[1], {"key": (1, 2.0)}] + >>> schema = _parse_schema_abstract("a[] b{c d}") + >>> schema = _infer_schema_type(row, schema) + >>> obj = _create_cls(schema)(row) + >>> pickle.loads(pickle.dumps(obj)) + Row(a=[1], b={'key': Row(c=1, d=2.0)}) + >>> pickle.loads(pickle.dumps(obj.a)) + [1] + >>> pickle.loads(pickle.dumps(obj.b)) + {'key': Row(c=1, d=2.0)} + """ + + if isinstance(dataType, ArrayType): + cls = _create_cls(dataType.elementType) + + def List(l): + if l is None: + return + return [_create_object(cls, v) for v in l] + + return List + + elif isinstance(dataType, MapType): + kcls = _create_cls(dataType.keyType) + vcls = _create_cls(dataType.valueType) + + def Dict(d): + if d is None: + return + return dict((_create_object(kcls, k), _create_object(vcls, v)) for k, v in d.items()) + + return Dict + + elif isinstance(dataType, DateType): + return datetime.date + + elif isinstance(dataType, UserDefinedType): + return lambda datum: dataType.deserialize(datum) + + elif not isinstance(dataType, StructType): + # no wrapper for primitive types + return lambda x: x + + class Row(tuple): + + """ Row in DataFrame """ + __datatype = dataType + __fields__ = tuple(f.name for f in dataType.fields) + __slots__ = () + + # create property for fast access + locals().update(_create_properties(dataType.fields)) + + def asDict(self): + """ Return as a dict """ + return dict((n, getattr(self, n)) for n in self.__fields__) + + def __repr__(self): + # call collect __repr__ for nested objects + return ("Row(%s)" % ", ".join("%s=%r" % (n, getattr(self, n)) + for n in self.__fields__)) + + def __reduce__(self): + return (_restore_object, (self.__datatype, tuple(self))) + + return Row + + +def _create_row(fields, values): + row = Row(*values) + row.__fields__ = fields + return row + + +class Row(tuple): + + """ + A row in L{DataFrame}. The fields in it can be accessed like attributes. + + Row can be used to create a row object by using named arguments, + the fields will be sorted by names. + + >>> row = Row(name="Alice", age=11) + >>> row + Row(age=11, name='Alice') + >>> row.name, row.age + ('Alice', 11) + + Row also can be used to create another Row like class, then it + could be used to create Row objects, such as + + >>> Person = Row("name", "age") + >>> Person + + >>> Person("Alice", 11) + Row(name='Alice', age=11) + """ + + def __new__(self, *args, **kwargs): + if args and kwargs: + raise ValueError("Can not use both args " + "and kwargs to create Row") + if args: + # create row class or objects + return tuple.__new__(self, args) + + elif kwargs: + # create row objects + names = sorted(kwargs.keys()) + row = tuple.__new__(self, [kwargs[n] for n in names]) + row.__fields__ = names + return row + + else: + raise ValueError("No args or kwargs") + + def asDict(self): + """ + Return as an dict + """ + if not hasattr(self, "__fields__"): + raise TypeError("Cannot convert a Row class into dict") + return dict(zip(self.__fields__, self)) + + # let object acts like class + def __call__(self, *args): + """create new Row object""" + return _create_row(self, args) + + def __getattr__(self, item): + if item.startswith("__"): + raise AttributeError(item) + try: + # it will be slow when it has many fields, + # but this will not be used in normal cases + idx = self.__fields__.index(item) + return self[idx] + except IndexError: + raise AttributeError(item) + except ValueError: + raise AttributeError(item) + + def __reduce__(self): + if hasattr(self, "__fields__"): + return (_create_row, (self.__fields__, tuple(self))) + else: + return tuple.__reduce__(self) + + def __repr__(self): + if hasattr(self, "__fields__"): + return "Row(%s)" % ", ".join("%s=%r" % (k, v) + for k, v in zip(self.__fields__, tuple(self))) + else: + return "" % ", ".join(self) + + +def _test(): + import doctest + from pyspark.context import SparkContext + # let doctest run in pyspark.sql.types, so DataTypes can be picklable + import pyspark.sql.types + from pyspark.sql import Row, SQLContext + from pyspark.sql.tests import ExamplePoint, ExamplePointUDT + globs = pyspark.sql.types.__dict__.copy() + sc = SparkContext('local[4]', 'PythonTest') + globs['sc'] = sc + globs['sqlContext'] = SQLContext(sc) + globs['ExamplePoint'] = ExamplePoint + globs['ExamplePointUDT'] = ExamplePointUDT + (failure_count, test_count) = doctest.testmod( + pyspark.sql.types, globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index e8529a8f8e..c90afc326c 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -15,14 +15,19 @@ # limitations under the License. # +import sys import warnings import json -from itertools import imap + +if sys.version >= '3': + basestring = unicode = str +else: + from itertools import imap as map from py4j.protocol import Py4JError from py4j.java_collections import MapConverter -from pyspark.rdd import RDD, _prepare_for_python_RDD +from pyspark.rdd import RDD, _prepare_for_python_RDD, ignore_unicode_prefix from pyspark.serializers import AutoBatchedSerializer, PickleSerializer from pyspark.sql.types import Row, StringType, StructType, _verify_type, \ _infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter @@ -62,31 +67,27 @@ class SQLContext(object): A SQLContext can be used create :class:`DataFrame`, register :class:`DataFrame` as tables, execute SQL over tables, cache tables, and read parquet files. - When created, :class:`SQLContext` adds a method called ``toDF`` to :class:`RDD`, - which could be used to convert an RDD into a DataFrame, it's a shorthand for - :func:`SQLContext.createDataFrame`. - :param sparkContext: The :class:`SparkContext` backing this SQLContext. :param sqlContext: An optional JVM Scala SQLContext. If set, we do not instantiate a new SQLContext in the JVM, instead we make all calls to this object. """ + @ignore_unicode_prefix def __init__(self, sparkContext, sqlContext=None): """Creates a new SQLContext. >>> from datetime import datetime >>> sqlContext = SQLContext(sc) - >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1L, + >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1, ... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1), ... time=datetime(2014, 8, 1, 14, 1, 5))]) >>> df = allTypes.toDF() >>> df.registerTempTable("allTypes") >>> sqlContext.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a ' ... 'from allTypes where b and i > 0').collect() - [Row(c0=2, c1=2.0, c2=False, c3=2, c4=0...8, 1, 14, 1, 5), a=1)] - >>> df.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, - ... x.row.a, x.list)).collect() - [(1, u'string', 1.0, 1, True, ...(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])] + [Row(c0=2, c1=2.0, c2=False, c3=2, c4=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)] + >>> df.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect() + [(1, u'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])] """ self._sc = sparkContext self._jsc = self._sc._jsc @@ -122,6 +123,7 @@ class SQLContext(object): """Returns a :class:`UDFRegistration` for UDF registration.""" return UDFRegistration(self) + @ignore_unicode_prefix def registerFunction(self, name, f, returnType=StringType()): """Registers a lambda function as a UDF so it can be used in SQL statements. @@ -147,7 +149,7 @@ class SQLContext(object): >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(c0=4)] """ - func = lambda _, it: imap(lambda x: f(*x), it) + func = lambda _, it: map(lambda x: f(*x), it) ser = AutoBatchedSerializer(PickleSerializer()) command = (func, None, ser, ser) pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self._sc, command, self) @@ -185,6 +187,7 @@ class SQLContext(object): schema = rdd.map(_infer_schema).reduce(_merge_type) return schema + @ignore_unicode_prefix def inferSchema(self, rdd, samplingRatio=None): """::note: Deprecated in 1.3, use :func:`createDataFrame` instead. """ @@ -195,6 +198,7 @@ class SQLContext(object): return self.createDataFrame(rdd, None, samplingRatio) + @ignore_unicode_prefix def applySchema(self, rdd, schema): """::note: Deprecated in 1.3, use :func:`createDataFrame` instead. """ @@ -208,6 +212,7 @@ class SQLContext(object): return self.createDataFrame(rdd, schema) + @ignore_unicode_prefix def createDataFrame(self, data, schema=None, samplingRatio=None): """ Creates a :class:`DataFrame` from an :class:`RDD` of :class:`tuple`/:class:`list`, @@ -380,6 +385,7 @@ class SQLContext(object): df = self._ssql_ctx.jsonFile(path, scala_datatype) return DataFrame(df, self) + @ignore_unicode_prefix def jsonRDD(self, rdd, schema=None, samplingRatio=1.0): """Loads an RDD storing one JSON object per string as a :class:`DataFrame`. @@ -477,6 +483,7 @@ class SQLContext(object): joptions) return DataFrame(df, self) + @ignore_unicode_prefix def sql(self, sqlQuery): """Returns a :class:`DataFrame` representing the result of the given query. @@ -497,6 +504,7 @@ class SQLContext(object): """ return DataFrame(self._ssql_ctx.table(tableName), self) + @ignore_unicode_prefix def tables(self, dbName=None): """Returns a :class:`DataFrame` containing names of tables in the given database. diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index f2c3b74a18..d76504f986 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -16,14 +16,19 @@ # import sys -import itertools import warnings import random +if sys.version >= '3': + basestring = unicode = str + long = int +else: + from itertools import imap as map + from py4j.java_collections import ListConverter, MapConverter from pyspark.context import SparkContext -from pyspark.rdd import RDD, _load_from_socket +from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync @@ -65,19 +70,20 @@ class DataFrame(object): self._sc = sql_ctx and sql_ctx._sc self.is_cached = False self._schema = None # initialized lazily + self._lazy_rdd = None @property def rdd(self): """Returns the content as an :class:`pyspark.RDD` of :class:`Row`. """ - if not hasattr(self, '_lazy_rdd'): + if self._lazy_rdd is None: jrdd = self._jdf.javaToPython() rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) schema = self.schema def applySchema(it): cls = _create_cls(schema) - return itertools.imap(cls, it) + return map(cls, it) self._lazy_rdd = rdd.mapPartitions(applySchema) @@ -89,13 +95,14 @@ class DataFrame(object): """ return DataFrameNaFunctions(self) - def toJSON(self, use_unicode=False): + @ignore_unicode_prefix + def toJSON(self, use_unicode=True): """Converts a :class:`DataFrame` into a :class:`RDD` of string. Each row is turned into a JSON document as one element in the returned RDD. >>> df.toJSON().first() - '{"age":2,"name":"Alice"}' + u'{"age":2,"name":"Alice"}' """ rdd = self._jdf.toJSON() return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode)) @@ -228,7 +235,7 @@ class DataFrame(object): |-- name: string (nullable = true) """ - print (self._jdf.schema().treeString()) + print(self._jdf.schema().treeString()) def explain(self, extended=False): """Prints the (logical and physical) plans to the console for debugging purpose. @@ -250,9 +257,9 @@ class DataFrame(object): == RDD == """ if extended: - print self._jdf.queryExecution().toString() + print(self._jdf.queryExecution().toString()) else: - print self._jdf.queryExecution().executedPlan().toString() + print(self._jdf.queryExecution().executedPlan().toString()) def isLocal(self): """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally @@ -270,7 +277,7 @@ class DataFrame(object): 2 Alice 5 Bob """ - print self._jdf.showString(n).encode('utf8', 'ignore') + print(self._jdf.showString(n)) def __repr__(self): return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) @@ -279,10 +286,11 @@ class DataFrame(object): """Returns the number of rows in this :class:`DataFrame`. >>> df.count() - 2L + 2 """ - return self._jdf.count() + return int(self._jdf.count()) + @ignore_unicode_prefix def collect(self): """Returns all the records as a list of :class:`Row`. @@ -295,6 +303,7 @@ class DataFrame(object): cls = _create_cls(self.schema) return [cls(r) for r in rs] + @ignore_unicode_prefix def limit(self, num): """Limits the result count to the number specified. @@ -306,6 +315,7 @@ class DataFrame(object): jdf = self._jdf.limit(num) return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix def take(self, num): """Returns the first ``num`` rows as a :class:`list` of :class:`Row`. @@ -314,6 +324,7 @@ class DataFrame(object): """ return self.limit(num).collect() + @ignore_unicode_prefix def map(self, f): """ Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`. @@ -324,6 +335,7 @@ class DataFrame(object): """ return self.rdd.map(f) + @ignore_unicode_prefix def flatMap(self, f): """ Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`, and then flattening the results. @@ -353,7 +365,7 @@ class DataFrame(object): This is a shorthand for ``df.rdd.foreach()``. >>> def f(person): - ... print person.name + ... print(person.name) >>> df.foreach(f) """ return self.rdd.foreach(f) @@ -365,7 +377,7 @@ class DataFrame(object): >>> def f(people): ... for person in people: - ... print person.name + ... print(person.name) >>> df.foreachPartition(f) """ return self.rdd.foreachPartition(f) @@ -412,7 +424,7 @@ class DataFrame(object): """Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`. >>> df.distinct().count() - 2L + 2 """ return DataFrame(self._jdf.distinct(), self.sql_ctx) @@ -420,10 +432,10 @@ class DataFrame(object): """Returns a sampled subset of this :class:`DataFrame`. >>> df.sample(False, 0.5, 97).count() - 1L + 1 """ assert fraction >= 0.0, "Negative fraction value: %s" % fraction - seed = seed if seed is not None else random.randint(0, sys.maxint) + seed = seed if seed is not None else random.randint(0, sys.maxsize) rdd = self._jdf.sample(withReplacement, fraction, long(seed)) return DataFrame(rdd, self.sql_ctx) @@ -437,6 +449,7 @@ class DataFrame(object): return [(str(f.name), f.dataType.simpleString()) for f in self.schema.fields] @property + @ignore_unicode_prefix def columns(self): """Returns all column names as a list. @@ -445,6 +458,7 @@ class DataFrame(object): """ return [f.name for f in self.schema.fields] + @ignore_unicode_prefix def join(self, other, joinExprs=None, joinType=None): """Joins with another :class:`DataFrame`, using the given join expression. @@ -470,6 +484,7 @@ class DataFrame(object): jdf = self._jdf.join(other._jdf, joinExprs._jc, joinType) return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix def sort(self, *cols): """Returns a new :class:`DataFrame` sorted by the specified column(s). @@ -513,6 +528,7 @@ class DataFrame(object): jdf = self._jdf.describe(self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols)) return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix def head(self, n=None): """ Returns the first ``n`` rows as a list of :class:`Row`, @@ -528,6 +544,7 @@ class DataFrame(object): return rs[0] if rs else None return self.take(n) + @ignore_unicode_prefix def first(self): """Returns the first row as a :class:`Row`. @@ -536,6 +553,7 @@ class DataFrame(object): """ return self.head() + @ignore_unicode_prefix def __getitem__(self, item): """Returns the column as a :class:`Column`. @@ -567,6 +585,7 @@ class DataFrame(object): jc = self._jdf.apply(name) return Column(jc) + @ignore_unicode_prefix def select(self, *cols): """Projects a set of expressions and returns a new :class:`DataFrame`. @@ -598,6 +617,7 @@ class DataFrame(object): jdf = self._jdf.selectExpr(self._sc._jvm.PythonUtils.toSeq(jexpr)) return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix def filter(self, condition): """Filters rows using the given condition. @@ -626,6 +646,7 @@ class DataFrame(object): where = filter + @ignore_unicode_prefix def groupBy(self, *cols): """Groups the :class:`DataFrame` using the specified columns, so we can run aggregation on them. See :class:`GroupedData` @@ -775,6 +796,7 @@ class DataFrame(object): cols = self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols) return DataFrame(self._jdf.na().fill(value, cols), self.sql_ctx) + @ignore_unicode_prefix def withColumn(self, colName, col): """Returns a new :class:`DataFrame` by adding a column. @@ -786,6 +808,7 @@ class DataFrame(object): """ return self.select('*', col.alias(colName)) + @ignore_unicode_prefix def withColumnRenamed(self, existing, new): """REturns a new :class:`DataFrame` by renaming an existing column. @@ -852,6 +875,7 @@ class GroupedData(object): self._jdf = jdf self.sql_ctx = sql_ctx + @ignore_unicode_prefix def agg(self, *exprs): """Compute aggregates and returns the result as a :class:`DataFrame`. @@ -1041,11 +1065,13 @@ class Column(object): __sub__ = _bin_op("minus") __mul__ = _bin_op("multiply") __div__ = _bin_op("divide") + __truediv__ = _bin_op("divide") __mod__ = _bin_op("mod") __radd__ = _bin_op("plus") __rsub__ = _reverse_op("minus") __rmul__ = _bin_op("multiply") __rdiv__ = _reverse_op("divide") + __rtruediv__ = _reverse_op("divide") __rmod__ = _reverse_op("mod") # logistic operators @@ -1075,6 +1101,7 @@ class Column(object): startswith = _bin_op("startsWith") endswith = _bin_op("endsWith") + @ignore_unicode_prefix def substr(self, startPos, length): """ Return a :class:`Column` which is a substring of the column @@ -1097,6 +1124,7 @@ class Column(object): __getslice__ = substr + @ignore_unicode_prefix def inSet(self, *cols): """ A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments. @@ -1131,6 +1159,7 @@ class Column(object): """ return Column(getattr(self._jc, "as")(alias)) + @ignore_unicode_prefix def cast(self, dataType): """ Convert the column into type `dataType` diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index daeb6916b5..1d65369528 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -18,8 +18,10 @@ """ A collections of builtin functions """ +import sys -from itertools import imap +if sys.version < "3": + from itertools import imap as map from py4j.java_collections import ListConverter @@ -116,7 +118,7 @@ class UserDefinedFunction(object): def _create_judf(self): f = self.func # put it in closure `func` - func = lambda _, it: imap(lambda x: f(*x), it) + func = lambda _, it: map(lambda x: f(*x), it) ser = AutoBatchedSerializer(PickleSerializer()) command = (func, None, ser, ser) sc = SparkContext._active_spark_context diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b3a6a2c6a9..7c09a0cfe3 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -157,13 +157,13 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual(4, res[0]) def test_udf_with_array_type(self): - d = [Row(l=range(3), d={"key": range(5)})] + d = [Row(l=list(range(3)), d={"key": list(range(5))})] rdd = self.sc.parallelize(d) self.sqlCtx.createDataFrame(rdd).registerTempTable("test") self.sqlCtx.registerFunction("copylist", lambda l: list(l), ArrayType(IntegerType())) self.sqlCtx.registerFunction("maplen", lambda d: len(d), IntegerType()) [(l1, l2)] = self.sqlCtx.sql("select copylist(l), maplen(d) from test").collect() - self.assertEqual(range(3), l1) + self.assertEqual(list(range(3)), l1) self.assertEqual(1, l2) def test_broadcast_in_udf(self): @@ -266,7 +266,7 @@ class SQLTests(ReusedPySparkTestCase): def test_apply_schema(self): from datetime import date, datetime - rdd = self.sc.parallelize([(127, -128L, -32768, 32767, 2147483647L, 1.0, + rdd = self.sc.parallelize([(127, -128, -32768, 32767, 2147483647, 1.0, date(2010, 1, 1), datetime(2010, 1, 1, 1, 1, 1), {"a": 1}, (2,), [1, 2, 3], None)]) schema = StructType([ @@ -309,7 +309,7 @@ class SQLTests(ReusedPySparkTestCase): def test_struct_in_map(self): d = [Row(m={Row(i=1): Row(s="")})] df = self.sc.parallelize(d).toDF() - k, v = df.head().m.items()[0] + k, v = list(df.head().m.items())[0] self.assertEqual(1, k.i) self.assertEqual("", v.s) @@ -554,6 +554,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase): except py4j.protocol.Py4JError: cls.sqlCtx = None return + except TypeError: + cls.sqlCtx = None + return os.unlink(cls.tempdir.name) _scala_HiveContext =\ cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc()) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py deleted file mode 100644 index ef76d84c00..0000000000 --- a/python/pyspark/sql/types.py +++ /dev/null @@ -1,1252 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import decimal -import datetime -import keyword -import warnings -import json -import re -import weakref -from array import array -from operator import itemgetter - - -__all__ = [ - "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType", - "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType", - "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"] - - -class DataType(object): - """Base class for data types.""" - - def __repr__(self): - return self.__class__.__name__ - - def __hash__(self): - return hash(str(self)) - - def __eq__(self, other): - return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ - - def __ne__(self, other): - return not self.__eq__(other) - - @classmethod - def typeName(cls): - return cls.__name__[:-4].lower() - - def simpleString(self): - return self.typeName() - - def jsonValue(self): - return self.typeName() - - def json(self): - return json.dumps(self.jsonValue(), - separators=(',', ':'), - sort_keys=True) - - -# This singleton pattern does not work with pickle, you will get -# another object after pickle and unpickle -class PrimitiveTypeSingleton(type): - """Metaclass for PrimitiveType""" - - _instances = {} - - def __call__(cls): - if cls not in cls._instances: - cls._instances[cls] = super(PrimitiveTypeSingleton, cls).__call__() - return cls._instances[cls] - - -class PrimitiveType(DataType): - """Spark SQL PrimitiveType""" - - __metaclass__ = PrimitiveTypeSingleton - - -class NullType(PrimitiveType): - """Null type. - - The data type representing None, used for the types that cannot be inferred. - """ - - -class StringType(PrimitiveType): - """String data type. - """ - - -class BinaryType(PrimitiveType): - """Binary (byte array) data type. - """ - - -class BooleanType(PrimitiveType): - """Boolean data type. - """ - - -class DateType(PrimitiveType): - """Date (datetime.date) data type. - """ - - -class TimestampType(PrimitiveType): - """Timestamp (datetime.datetime) data type. - """ - - -class DecimalType(DataType): - """Decimal (decimal.Decimal) data type. - """ - - def __init__(self, precision=None, scale=None): - self.precision = precision - self.scale = scale - self.hasPrecisionInfo = precision is not None - - def simpleString(self): - if self.hasPrecisionInfo: - return "decimal(%d,%d)" % (self.precision, self.scale) - else: - return "decimal(10,0)" - - def jsonValue(self): - if self.hasPrecisionInfo: - return "decimal(%d,%d)" % (self.precision, self.scale) - else: - return "decimal" - - def __repr__(self): - if self.hasPrecisionInfo: - return "DecimalType(%d,%d)" % (self.precision, self.scale) - else: - return "DecimalType()" - - -class DoubleType(PrimitiveType): - """Double data type, representing double precision floats. - """ - - -class FloatType(PrimitiveType): - """Float data type, representing single precision floats. - """ - - -class ByteType(PrimitiveType): - """Byte data type, i.e. a signed integer in a single byte. - """ - def simpleString(self): - return 'tinyint' - - -class IntegerType(PrimitiveType): - """Int data type, i.e. a signed 32-bit integer. - """ - def simpleString(self): - return 'int' - - -class LongType(PrimitiveType): - """Long data type, i.e. a signed 64-bit integer. - - If the values are beyond the range of [-9223372036854775808, 9223372036854775807], - please use :class:`DecimalType`. - """ - def simpleString(self): - return 'bigint' - - -class ShortType(PrimitiveType): - """Short data type, i.e. a signed 16-bit integer. - """ - def simpleString(self): - return 'smallint' - - -class ArrayType(DataType): - """Array data type. - - :param elementType: :class:`DataType` of each element in the array. - :param containsNull: boolean, whether the array can contain null (None) values. - """ - - def __init__(self, elementType, containsNull=True): - """ - >>> ArrayType(StringType()) == ArrayType(StringType(), True) - True - >>> ArrayType(StringType(), False) == ArrayType(StringType()) - False - """ - assert isinstance(elementType, DataType), "elementType should be DataType" - self.elementType = elementType - self.containsNull = containsNull - - def simpleString(self): - return 'array<%s>' % self.elementType.simpleString() - - def __repr__(self): - return "ArrayType(%s,%s)" % (self.elementType, - str(self.containsNull).lower()) - - def jsonValue(self): - return {"type": self.typeName(), - "elementType": self.elementType.jsonValue(), - "containsNull": self.containsNull} - - @classmethod - def fromJson(cls, json): - return ArrayType(_parse_datatype_json_value(json["elementType"]), - json["containsNull"]) - - -class MapType(DataType): - """Map data type. - - :param keyType: :class:`DataType` of the keys in the map. - :param valueType: :class:`DataType` of the values in the map. - :param valueContainsNull: indicates whether values can contain null (None) values. - - Keys in a map data type are not allowed to be null (None). - """ - - def __init__(self, keyType, valueType, valueContainsNull=True): - """ - >>> (MapType(StringType(), IntegerType()) - ... == MapType(StringType(), IntegerType(), True)) - True - >>> (MapType(StringType(), IntegerType(), False) - ... == MapType(StringType(), FloatType())) - False - """ - assert isinstance(keyType, DataType), "keyType should be DataType" - assert isinstance(valueType, DataType), "valueType should be DataType" - self.keyType = keyType - self.valueType = valueType - self.valueContainsNull = valueContainsNull - - def simpleString(self): - return 'map<%s,%s>' % (self.keyType.simpleString(), self.valueType.simpleString()) - - def __repr__(self): - return "MapType(%s,%s,%s)" % (self.keyType, self.valueType, - str(self.valueContainsNull).lower()) - - def jsonValue(self): - return {"type": self.typeName(), - "keyType": self.keyType.jsonValue(), - "valueType": self.valueType.jsonValue(), - "valueContainsNull": self.valueContainsNull} - - @classmethod - def fromJson(cls, json): - return MapType(_parse_datatype_json_value(json["keyType"]), - _parse_datatype_json_value(json["valueType"]), - json["valueContainsNull"]) - - -class StructField(DataType): - """A field in :class:`StructType`. - - :param name: string, name of the field. - :param dataType: :class:`DataType` of the field. - :param nullable: boolean, whether the field can be null (None) or not. - :param metadata: a dict from string to simple type that can be serialized to JSON automatically - """ - - def __init__(self, name, dataType, nullable=True, metadata=None): - """ - >>> (StructField("f1", StringType(), True) - ... == StructField("f1", StringType(), True)) - True - >>> (StructField("f1", StringType(), True) - ... == StructField("f2", StringType(), True)) - False - """ - assert isinstance(dataType, DataType), "dataType should be DataType" - self.name = name - self.dataType = dataType - self.nullable = nullable - self.metadata = metadata or {} - - def simpleString(self): - return '%s:%s' % (self.name, self.dataType.simpleString()) - - def __repr__(self): - return "StructField(%s,%s,%s)" % (self.name, self.dataType, - str(self.nullable).lower()) - - def jsonValue(self): - return {"name": self.name, - "type": self.dataType.jsonValue(), - "nullable": self.nullable, - "metadata": self.metadata} - - @classmethod - def fromJson(cls, json): - return StructField(json["name"], - _parse_datatype_json_value(json["type"]), - json["nullable"], - json["metadata"]) - - -class StructType(DataType): - """Struct type, consisting of a list of :class:`StructField`. - - This is the data type representing a :class:`Row`. - """ - - def __init__(self, fields): - """ - >>> struct1 = StructType([StructField("f1", StringType(), True)]) - >>> struct2 = StructType([StructField("f1", StringType(), True)]) - >>> struct1 == struct2 - True - >>> struct1 = StructType([StructField("f1", StringType(), True)]) - >>> struct2 = StructType([StructField("f1", StringType(), True), - ... StructField("f2", IntegerType(), False)]) - >>> struct1 == struct2 - False - """ - assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType" - self.fields = fields - - def simpleString(self): - return 'struct<%s>' % (','.join(f.simpleString() for f in self.fields)) - - def __repr__(self): - return ("StructType(List(%s))" % - ",".join(str(field) for field in self.fields)) - - def jsonValue(self): - return {"type": self.typeName(), - "fields": [f.jsonValue() for f in self.fields]} - - @classmethod - def fromJson(cls, json): - return StructType([StructField.fromJson(f) for f in json["fields"]]) - - -class UserDefinedType(DataType): - """User-defined type (UDT). - - .. note:: WARN: Spark Internal Use Only - """ - - @classmethod - def typeName(cls): - return cls.__name__.lower() - - @classmethod - def sqlType(cls): - """ - Underlying SQL storage type for this UDT. - """ - raise NotImplementedError("UDT must implement sqlType().") - - @classmethod - def module(cls): - """ - The Python module of the UDT. - """ - raise NotImplementedError("UDT must implement module().") - - @classmethod - def scalaUDT(cls): - """ - The class name of the paired Scala UDT. - """ - raise NotImplementedError("UDT must have a paired Scala UDT.") - - def serialize(self, obj): - """ - Converts the a user-type object into a SQL datum. - """ - raise NotImplementedError("UDT must implement serialize().") - - def deserialize(self, datum): - """ - Converts a SQL datum into a user-type object. - """ - raise NotImplementedError("UDT must implement deserialize().") - - def simpleString(self): - return 'udt' - - def json(self): - return json.dumps(self.jsonValue(), separators=(',', ':'), sort_keys=True) - - def jsonValue(self): - schema = { - "type": "udt", - "class": self.scalaUDT(), - "pyClass": "%s.%s" % (self.module(), type(self).__name__), - "sqlType": self.sqlType().jsonValue() - } - return schema - - @classmethod - def fromJson(cls, json): - pyUDT = json["pyClass"] - split = pyUDT.rfind(".") - pyModule = pyUDT[:split] - pyClass = pyUDT[split+1:] - m = __import__(pyModule, globals(), locals(), [pyClass], -1) - UDT = getattr(m, pyClass) - return UDT() - - def __eq__(self, other): - return type(self) == type(other) - - -_all_primitive_types = dict((v.typeName(), v) - for v in globals().itervalues() - if type(v) is PrimitiveTypeSingleton and - v.__base__ == PrimitiveType) - - -_all_complex_types = dict((v.typeName(), v) - for v in [ArrayType, MapType, StructType]) - - -def _parse_datatype_json_string(json_string): - """Parses the given data type JSON string. - >>> import pickle - >>> def check_datatype(datatype): - ... pickled = pickle.loads(pickle.dumps(datatype)) - ... assert datatype == pickled - ... scala_datatype = sqlContext._ssql_ctx.parseDataType(datatype.json()) - ... python_datatype = _parse_datatype_json_string(scala_datatype.json()) - ... assert datatype == python_datatype - >>> for cls in _all_primitive_types.values(): - ... check_datatype(cls()) - - >>> # Simple ArrayType. - >>> simple_arraytype = ArrayType(StringType(), True) - >>> check_datatype(simple_arraytype) - - >>> # Simple MapType. - >>> simple_maptype = MapType(StringType(), LongType()) - >>> check_datatype(simple_maptype) - - >>> # Simple StructType. - >>> simple_structtype = StructType([ - ... StructField("a", DecimalType(), False), - ... StructField("b", BooleanType(), True), - ... StructField("c", LongType(), True), - ... StructField("d", BinaryType(), False)]) - >>> check_datatype(simple_structtype) - - >>> # Complex StructType. - >>> complex_structtype = StructType([ - ... StructField("simpleArray", simple_arraytype, True), - ... StructField("simpleMap", simple_maptype, True), - ... StructField("simpleStruct", simple_structtype, True), - ... StructField("boolean", BooleanType(), False), - ... StructField("withMeta", DoubleType(), False, {"name": "age"})]) - >>> check_datatype(complex_structtype) - - >>> # Complex ArrayType. - >>> complex_arraytype = ArrayType(complex_structtype, True) - >>> check_datatype(complex_arraytype) - - >>> # Complex MapType. - >>> complex_maptype = MapType(complex_structtype, - ... complex_arraytype, False) - >>> check_datatype(complex_maptype) - - >>> check_datatype(ExamplePointUDT()) - >>> structtype_with_udt = StructType([StructField("label", DoubleType(), False), - ... StructField("point", ExamplePointUDT(), False)]) - >>> check_datatype(structtype_with_udt) - """ - return _parse_datatype_json_value(json.loads(json_string)) - - -_FIXED_DECIMAL = re.compile("decimal\\((\\d+),(\\d+)\\)") - - -def _parse_datatype_json_value(json_value): - if type(json_value) is unicode: - if json_value in _all_primitive_types.keys(): - return _all_primitive_types[json_value]() - elif json_value == u'decimal': - return DecimalType() - elif _FIXED_DECIMAL.match(json_value): - m = _FIXED_DECIMAL.match(json_value) - return DecimalType(int(m.group(1)), int(m.group(2))) - else: - raise ValueError("Could not parse datatype: %s" % json_value) - else: - tpe = json_value["type"] - if tpe in _all_complex_types: - return _all_complex_types[tpe].fromJson(json_value) - elif tpe == 'udt': - return UserDefinedType.fromJson(json_value) - else: - raise ValueError("not supported type: %s" % tpe) - - -# Mapping Python types to Spark SQL DataType -_type_mappings = { - type(None): NullType, - bool: BooleanType, - int: LongType, - long: LongType, - float: DoubleType, - str: StringType, - unicode: StringType, - bytearray: BinaryType, - decimal.Decimal: DecimalType, - datetime.date: DateType, - datetime.datetime: TimestampType, - datetime.time: TimestampType, -} - - -def _infer_type(obj): - """Infer the DataType from obj - - >>> p = ExamplePoint(1.0, 2.0) - >>> _infer_type(p) - ExamplePointUDT - """ - if obj is None: - return NullType() - - if hasattr(obj, '__UDT__'): - return obj.__UDT__ - - dataType = _type_mappings.get(type(obj)) - if dataType is not None: - return dataType() - - if isinstance(obj, dict): - for key, value in obj.iteritems(): - if key is not None and value is not None: - return MapType(_infer_type(key), _infer_type(value), True) - else: - return MapType(NullType(), NullType(), True) - elif isinstance(obj, (list, array)): - for v in obj: - if v is not None: - return ArrayType(_infer_type(obj[0]), True) - else: - return ArrayType(NullType(), True) - else: - try: - return _infer_schema(obj) - except ValueError: - raise ValueError("not supported type: %s" % type(obj)) - - -def _infer_schema(row): - """Infer the schema from dict/namedtuple/object""" - if isinstance(row, dict): - items = sorted(row.items()) - - elif isinstance(row, (tuple, list)): - if hasattr(row, "_fields"): # namedtuple - items = zip(row._fields, tuple(row)) - elif hasattr(row, "__fields__"): # Row - items = zip(row.__fields__, tuple(row)) - else: - names = ['_%d' % i for i in range(1, len(row) + 1)] - items = zip(names, row) - - elif hasattr(row, "__dict__"): # object - items = sorted(row.__dict__.items()) - - else: - raise ValueError("Can not infer schema for type: %s" % type(row)) - - fields = [StructField(k, _infer_type(v), True) for k, v in items] - return StructType(fields) - - -def _need_python_to_sql_conversion(dataType): - """ - Checks whether we need python to sql conversion for the given type. - For now, only UDTs need this conversion. - - >>> _need_python_to_sql_conversion(DoubleType()) - False - >>> schema0 = StructType([StructField("indices", ArrayType(IntegerType(), False), False), - ... StructField("values", ArrayType(DoubleType(), False), False)]) - >>> _need_python_to_sql_conversion(schema0) - False - >>> _need_python_to_sql_conversion(ExamplePointUDT()) - True - >>> schema1 = ArrayType(ExamplePointUDT(), False) - >>> _need_python_to_sql_conversion(schema1) - True - >>> schema2 = StructType([StructField("label", DoubleType(), False), - ... StructField("point", ExamplePointUDT(), False)]) - >>> _need_python_to_sql_conversion(schema2) - True - """ - if isinstance(dataType, StructType): - return any([_need_python_to_sql_conversion(f.dataType) for f in dataType.fields]) - elif isinstance(dataType, ArrayType): - return _need_python_to_sql_conversion(dataType.elementType) - elif isinstance(dataType, MapType): - return _need_python_to_sql_conversion(dataType.keyType) or \ - _need_python_to_sql_conversion(dataType.valueType) - elif isinstance(dataType, UserDefinedType): - return True - else: - return False - - -def _python_to_sql_converter(dataType): - """ - Returns a converter that converts a Python object into a SQL datum for the given type. - - >>> conv = _python_to_sql_converter(DoubleType()) - >>> conv(1.0) - 1.0 - >>> conv = _python_to_sql_converter(ArrayType(DoubleType(), False)) - >>> conv([1.0, 2.0]) - [1.0, 2.0] - >>> conv = _python_to_sql_converter(ExamplePointUDT()) - >>> conv(ExamplePoint(1.0, 2.0)) - [1.0, 2.0] - >>> schema = StructType([StructField("label", DoubleType(), False), - ... StructField("point", ExamplePointUDT(), False)]) - >>> conv = _python_to_sql_converter(schema) - >>> conv((1.0, ExamplePoint(1.0, 2.0))) - (1.0, [1.0, 2.0]) - """ - if not _need_python_to_sql_conversion(dataType): - return lambda x: x - - if isinstance(dataType, StructType): - names, types = zip(*[(f.name, f.dataType) for f in dataType.fields]) - converters = map(_python_to_sql_converter, types) - - def converter(obj): - if isinstance(obj, dict): - return tuple(c(obj.get(n)) for n, c in zip(names, converters)) - elif isinstance(obj, tuple): - if hasattr(obj, "_fields") or hasattr(obj, "__fields__"): - return tuple(c(v) for c, v in zip(converters, obj)) - elif all(isinstance(x, tuple) and len(x) == 2 for x in obj): # k-v pairs - d = dict(obj) - return tuple(c(d.get(n)) for n, c in zip(names, converters)) - else: - return tuple(c(v) for c, v in zip(converters, obj)) - else: - raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType)) - return converter - elif isinstance(dataType, ArrayType): - element_converter = _python_to_sql_converter(dataType.elementType) - return lambda a: [element_converter(v) for v in a] - elif isinstance(dataType, MapType): - key_converter = _python_to_sql_converter(dataType.keyType) - value_converter = _python_to_sql_converter(dataType.valueType) - return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) - elif isinstance(dataType, UserDefinedType): - return lambda obj: dataType.serialize(obj) - else: - raise ValueError("Unexpected type %r" % dataType) - - -def _has_nulltype(dt): - """ Return whether there is NullType in `dt` or not """ - if isinstance(dt, StructType): - return any(_has_nulltype(f.dataType) for f in dt.fields) - elif isinstance(dt, ArrayType): - return _has_nulltype((dt.elementType)) - elif isinstance(dt, MapType): - return _has_nulltype(dt.keyType) or _has_nulltype(dt.valueType) - else: - return isinstance(dt, NullType) - - -def _merge_type(a, b): - if isinstance(a, NullType): - return b - elif isinstance(b, NullType): - return a - elif type(a) is not type(b): - # TODO: type cast (such as int -> long) - raise TypeError("Can not merge type %s and %s" % (a, b)) - - # same type - if isinstance(a, StructType): - nfs = dict((f.name, f.dataType) for f in b.fields) - fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()))) - for f in a.fields] - names = set([f.name for f in fields]) - for n in nfs: - if n not in names: - fields.append(StructField(n, nfs[n])) - return StructType(fields) - - elif isinstance(a, ArrayType): - return ArrayType(_merge_type(a.elementType, b.elementType), True) - - elif isinstance(a, MapType): - return MapType(_merge_type(a.keyType, b.keyType), - _merge_type(a.valueType, b.valueType), - True) - else: - return a - - -def _need_converter(dataType): - if isinstance(dataType, StructType): - return True - elif isinstance(dataType, ArrayType): - return _need_converter(dataType.elementType) - elif isinstance(dataType, MapType): - return _need_converter(dataType.keyType) or _need_converter(dataType.valueType) - elif isinstance(dataType, NullType): - return True - else: - return False - - -def _create_converter(dataType): - """Create an converter to drop the names of fields in obj """ - if not _need_converter(dataType): - return lambda x: x - - if isinstance(dataType, ArrayType): - conv = _create_converter(dataType.elementType) - return lambda row: map(conv, row) - - elif isinstance(dataType, MapType): - kconv = _create_converter(dataType.keyType) - vconv = _create_converter(dataType.valueType) - return lambda row: dict((kconv(k), vconv(v)) for k, v in row.iteritems()) - - elif isinstance(dataType, NullType): - return lambda x: None - - elif not isinstance(dataType, StructType): - return lambda x: x - - # dataType must be StructType - names = [f.name for f in dataType.fields] - converters = [_create_converter(f.dataType) for f in dataType.fields] - convert_fields = any(_need_converter(f.dataType) for f in dataType.fields) - - def convert_struct(obj): - if obj is None: - return - - if isinstance(obj, (tuple, list)): - if convert_fields: - return tuple(conv(v) for v, conv in zip(obj, converters)) - else: - return tuple(obj) - - if isinstance(obj, dict): - d = obj - elif hasattr(obj, "__dict__"): # object - d = obj.__dict__ - else: - raise ValueError("Unexpected obj: %s" % obj) - - if convert_fields: - return tuple([conv(d.get(name)) for name, conv in zip(names, converters)]) - else: - return tuple([d.get(name) for name in names]) - - return convert_struct - - -_BRACKETS = {'(': ')', '[': ']', '{': '}'} - - -def _split_schema_abstract(s): - """ - split the schema abstract into fields - - >>> _split_schema_abstract("a b c") - ['a', 'b', 'c'] - >>> _split_schema_abstract("a(a b)") - ['a(a b)'] - >>> _split_schema_abstract("a b[] c{a b}") - ['a', 'b[]', 'c{a b}'] - >>> _split_schema_abstract(" ") - [] - """ - - r = [] - w = '' - brackets = [] - for c in s: - if c == ' ' and not brackets: - if w: - r.append(w) - w = '' - else: - w += c - if c in _BRACKETS: - brackets.append(c) - elif c in _BRACKETS.values(): - if not brackets or c != _BRACKETS[brackets.pop()]: - raise ValueError("unexpected " + c) - - if brackets: - raise ValueError("brackets not closed: %s" % brackets) - if w: - r.append(w) - return r - - -def _parse_field_abstract(s): - """ - Parse a field in schema abstract - - >>> _parse_field_abstract("a") - StructField(a,NullType,true) - >>> _parse_field_abstract("b(c d)") - StructField(b,StructType(...c,NullType,true),StructField(d... - >>> _parse_field_abstract("a[]") - StructField(a,ArrayType(NullType,true),true) - >>> _parse_field_abstract("a{[]}") - StructField(a,MapType(NullType,ArrayType(NullType,true),true),true) - """ - if set(_BRACKETS.keys()) & set(s): - idx = min((s.index(c) for c in _BRACKETS if c in s)) - name = s[:idx] - return StructField(name, _parse_schema_abstract(s[idx:]), True) - else: - return StructField(s, NullType(), True) - - -def _parse_schema_abstract(s): - """ - parse abstract into schema - - >>> _parse_schema_abstract("a b c") - StructType...a...b...c... - >>> _parse_schema_abstract("a[b c] b{}") - StructType...a,ArrayType...b...c...b,MapType... - >>> _parse_schema_abstract("c{} d{a b}") - StructType...c,MapType...d,MapType...a...b... - >>> _parse_schema_abstract("a b(t)").fields[1] - StructField(b,StructType(List(StructField(t,NullType,true))),true) - """ - s = s.strip() - if not s: - return NullType() - - elif s.startswith('('): - return _parse_schema_abstract(s[1:-1]) - - elif s.startswith('['): - return ArrayType(_parse_schema_abstract(s[1:-1]), True) - - elif s.startswith('{'): - return MapType(NullType(), _parse_schema_abstract(s[1:-1])) - - parts = _split_schema_abstract(s) - fields = [_parse_field_abstract(p) for p in parts] - return StructType(fields) - - -def _infer_schema_type(obj, dataType): - """ - Fill the dataType with types inferred from obj - - >>> schema = _parse_schema_abstract("a b c d") - >>> row = (1, 1.0, "str", datetime.date(2014, 10, 10)) - >>> _infer_schema_type(row, schema) - StructType...LongType...DoubleType...StringType...DateType... - >>> row = [[1], {"key": (1, 2.0)}] - >>> schema = _parse_schema_abstract("a[] b{c d}") - >>> _infer_schema_type(row, schema) - StructType...a,ArrayType...b,MapType(StringType,...c,LongType... - """ - if dataType is NullType(): - return _infer_type(obj) - - if not obj: - return NullType() - - if isinstance(dataType, ArrayType): - eType = _infer_schema_type(obj[0], dataType.elementType) - return ArrayType(eType, True) - - elif isinstance(dataType, MapType): - k, v = obj.iteritems().next() - return MapType(_infer_schema_type(k, dataType.keyType), - _infer_schema_type(v, dataType.valueType)) - - elif isinstance(dataType, StructType): - fs = dataType.fields - assert len(fs) == len(obj), \ - "Obj(%s) have different length with fields(%s)" % (obj, fs) - fields = [StructField(f.name, _infer_schema_type(o, f.dataType), True) - for o, f in zip(obj, fs)] - return StructType(fields) - - else: - raise ValueError("Unexpected dataType: %s" % dataType) - - -_acceptable_types = { - BooleanType: (bool,), - ByteType: (int, long), - ShortType: (int, long), - IntegerType: (int, long), - LongType: (int, long), - FloatType: (float,), - DoubleType: (float,), - DecimalType: (decimal.Decimal,), - StringType: (str, unicode), - BinaryType: (bytearray,), - DateType: (datetime.date,), - TimestampType: (datetime.datetime,), - ArrayType: (list, tuple, array), - MapType: (dict,), - StructType: (tuple, list), -} - - -def _verify_type(obj, dataType): - """ - Verify the type of obj against dataType, raise an exception if - they do not match. - - >>> _verify_type(None, StructType([])) - >>> _verify_type("", StringType()) - >>> _verify_type(0, LongType()) - >>> _verify_type(range(3), ArrayType(ShortType())) - >>> _verify_type(set(), ArrayType(StringType())) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ... - TypeError:... - >>> _verify_type({}, MapType(StringType(), IntegerType())) - >>> _verify_type((), StructType([])) - >>> _verify_type([], StructType([])) - >>> _verify_type([1], StructType([])) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ... - ValueError:... - >>> _verify_type(ExamplePoint(1.0, 2.0), ExamplePointUDT()) - >>> _verify_type([1.0, 2.0], ExamplePointUDT()) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ... - ValueError:... - """ - # all objects are nullable - if obj is None: - return - - if isinstance(dataType, UserDefinedType): - if not (hasattr(obj, '__UDT__') and obj.__UDT__ == dataType): - raise ValueError("%r is not an instance of type %r" % (obj, dataType)) - _verify_type(dataType.serialize(obj), dataType.sqlType()) - return - - _type = type(dataType) - assert _type in _acceptable_types, "unknown datatype: %s" % dataType - - # subclass of them can not be deserialized in JVM - if type(obj) not in _acceptable_types[_type]: - raise TypeError("%s can not accept object in type %s" - % (dataType, type(obj))) - - if isinstance(dataType, ArrayType): - for i in obj: - _verify_type(i, dataType.elementType) - - elif isinstance(dataType, MapType): - for k, v in obj.iteritems(): - _verify_type(k, dataType.keyType) - _verify_type(v, dataType.valueType) - - elif isinstance(dataType, StructType): - if len(obj) != len(dataType.fields): - raise ValueError("Length of object (%d) does not match with " - "length of fields (%d)" % (len(obj), len(dataType.fields))) - for v, f in zip(obj, dataType.fields): - _verify_type(v, f.dataType) - -_cached_cls = weakref.WeakValueDictionary() - - -def _restore_object(dataType, obj): - """ Restore object during unpickling. """ - # use id(dataType) as key to speed up lookup in dict - # Because of batched pickling, dataType will be the - # same object in most cases. - k = id(dataType) - cls = _cached_cls.get(k) - if cls is None or cls.__datatype is not dataType: - # use dataType as key to avoid create multiple class - cls = _cached_cls.get(dataType) - if cls is None: - cls = _create_cls(dataType) - _cached_cls[dataType] = cls - cls.__datatype = dataType - _cached_cls[k] = cls - return cls(obj) - - -def _create_object(cls, v): - """ Create an customized object with class `cls`. """ - # datetime.date would be deserialized as datetime.datetime - # from java type, so we need to set it back. - if cls is datetime.date and isinstance(v, datetime.datetime): - return v.date() - return cls(v) if v is not None else v - - -def _create_getter(dt, i): - """ Create a getter for item `i` with schema """ - cls = _create_cls(dt) - - def getter(self): - return _create_object(cls, self[i]) - - return getter - - -def _has_struct_or_date(dt): - """Return whether `dt` is or has StructType/DateType in it""" - if isinstance(dt, StructType): - return True - elif isinstance(dt, ArrayType): - return _has_struct_or_date(dt.elementType) - elif isinstance(dt, MapType): - return _has_struct_or_date(dt.keyType) or _has_struct_or_date(dt.valueType) - elif isinstance(dt, DateType): - return True - elif isinstance(dt, UserDefinedType): - return True - return False - - -def _create_properties(fields): - """Create properties according to fields""" - ps = {} - for i, f in enumerate(fields): - name = f.name - if (name.startswith("__") and name.endswith("__") - or keyword.iskeyword(name)): - warnings.warn("field name %s can not be accessed in Python," - "use position to access it instead" % name) - if _has_struct_or_date(f.dataType): - # delay creating object until accessing it - getter = _create_getter(f.dataType, i) - else: - getter = itemgetter(i) - ps[name] = property(getter) - return ps - - -def _create_cls(dataType): - """ - Create an class by dataType - - The created class is similar to namedtuple, but can have nested schema. - - >>> schema = _parse_schema_abstract("a b c") - >>> row = (1, 1.0, "str") - >>> schema = _infer_schema_type(row, schema) - >>> obj = _create_cls(schema)(row) - >>> import pickle - >>> pickle.loads(pickle.dumps(obj)) - Row(a=1, b=1.0, c='str') - - >>> row = [[1], {"key": (1, 2.0)}] - >>> schema = _parse_schema_abstract("a[] b{c d}") - >>> schema = _infer_schema_type(row, schema) - >>> obj = _create_cls(schema)(row) - >>> pickle.loads(pickle.dumps(obj)) - Row(a=[1], b={'key': Row(c=1, d=2.0)}) - >>> pickle.loads(pickle.dumps(obj.a)) - [1] - >>> pickle.loads(pickle.dumps(obj.b)) - {'key': Row(c=1, d=2.0)} - """ - - if isinstance(dataType, ArrayType): - cls = _create_cls(dataType.elementType) - - def List(l): - if l is None: - return - return [_create_object(cls, v) for v in l] - - return List - - elif isinstance(dataType, MapType): - kcls = _create_cls(dataType.keyType) - vcls = _create_cls(dataType.valueType) - - def Dict(d): - if d is None: - return - return dict((_create_object(kcls, k), _create_object(vcls, v)) for k, v in d.items()) - - return Dict - - elif isinstance(dataType, DateType): - return datetime.date - - elif isinstance(dataType, UserDefinedType): - return lambda datum: dataType.deserialize(datum) - - elif not isinstance(dataType, StructType): - # no wrapper for primitive types - return lambda x: x - - class Row(tuple): - - """ Row in DataFrame """ - __datatype = dataType - __fields__ = tuple(f.name for f in dataType.fields) - __slots__ = () - - # create property for fast access - locals().update(_create_properties(dataType.fields)) - - def asDict(self): - """ Return as a dict """ - return dict((n, getattr(self, n)) for n in self.__fields__) - - def __repr__(self): - # call collect __repr__ for nested objects - return ("Row(%s)" % ", ".join("%s=%r" % (n, getattr(self, n)) - for n in self.__fields__)) - - def __reduce__(self): - return (_restore_object, (self.__datatype, tuple(self))) - - return Row - - -def _create_row(fields, values): - row = Row(*values) - row.__fields__ = fields - return row - - -class Row(tuple): - - """ - A row in L{DataFrame}. The fields in it can be accessed like attributes. - - Row can be used to create a row object by using named arguments, - the fields will be sorted by names. - - >>> row = Row(name="Alice", age=11) - >>> row - Row(age=11, name='Alice') - >>> row.name, row.age - ('Alice', 11) - - Row also can be used to create another Row like class, then it - could be used to create Row objects, such as - - >>> Person = Row("name", "age") - >>> Person - - >>> Person("Alice", 11) - Row(name='Alice', age=11) - """ - - def __new__(self, *args, **kwargs): - if args and kwargs: - raise ValueError("Can not use both args " - "and kwargs to create Row") - if args: - # create row class or objects - return tuple.__new__(self, args) - - elif kwargs: - # create row objects - names = sorted(kwargs.keys()) - row = tuple.__new__(self, [kwargs[n] for n in names]) - row.__fields__ = names - return row - - else: - raise ValueError("No args or kwargs") - - def asDict(self): - """ - Return as an dict - """ - if not hasattr(self, "__fields__"): - raise TypeError("Cannot convert a Row class into dict") - return dict(zip(self.__fields__, self)) - - # let object acts like class - def __call__(self, *args): - """create new Row object""" - return _create_row(self, args) - - def __getattr__(self, item): - if item.startswith("__"): - raise AttributeError(item) - try: - # it will be slow when it has many fields, - # but this will not be used in normal cases - idx = self.__fields__.index(item) - return self[idx] - except IndexError: - raise AttributeError(item) - - def __reduce__(self): - if hasattr(self, "__fields__"): - return (_create_row, (self.__fields__, tuple(self))) - else: - return tuple.__reduce__(self) - - def __repr__(self): - if hasattr(self, "__fields__"): - return "Row(%s)" % ", ".join("%s=%r" % (k, v) - for k, v in zip(self.__fields__, tuple(self))) - else: - return "" % ", ".join(self) - - -def _test(): - import doctest - from pyspark.context import SparkContext - # let doctest run in pyspark.sql.types, so DataTypes can be picklable - import pyspark.sql.types - from pyspark.sql import Row, SQLContext - from pyspark.sql.tests import ExamplePoint, ExamplePointUDT - globs = pyspark.sql.types.__dict__.copy() - sc = SparkContext('local[4]', 'PythonTest') - globs['sc'] = sc - globs['sqlContext'] = SQLContext(sc) - globs['ExamplePoint'] = ExamplePoint - globs['ExamplePointUDT'] = ExamplePointUDT - (failure_count, test_count) = doctest.testmod( - pyspark.sql.types, globs=globs, optionflags=doctest.ELLIPSIS) - globs['sc'].stop() - if failure_count: - exit(-1) - - -if __name__ == "__main__": - _test() diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 1e597d64e0..944fa414b0 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -31,7 +31,7 @@ except ImportError: class StatCounter(object): def __init__(self, values=[]): - self.n = 0L # Running count of our values + self.n = 0 # Running count of our values self.mu = 0.0 # Running mean of our values self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) self.maxValue = float("-inf") @@ -87,7 +87,7 @@ class StatCounter(object): return copy.deepcopy(self) def count(self): - return self.n + return int(self.n) def mean(self): return self.mu diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 2c73083c9f..4590c58839 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +from __future__ import print_function + import os import sys @@ -157,7 +160,7 @@ class StreamingContext(object): try: jssc = gw.jvm.JavaStreamingContext(checkpointPath) except Exception: - print >>sys.stderr, "failed to load StreamingContext from checkpoint" + print("failed to load StreamingContext from checkpoint", file=sys.stderr) raise jsc = jssc.sparkContext() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 3fa4244423..ff097985fa 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -15,11 +15,15 @@ # limitations under the License. # -from itertools import chain, ifilter, imap +import sys import operator import time +from itertools import chain from datetime import datetime +if sys.version < "3": + from itertools import imap as map, ifilter as filter + from py4j.protocol import Py4JJavaError from pyspark import RDD @@ -76,7 +80,7 @@ class DStream(object): Return a new DStream containing only the elements that satisfy predicate. """ def func(iterator): - return ifilter(f, iterator) + return filter(f, iterator) return self.mapPartitions(func, True) def flatMap(self, f, preservesPartitioning=False): @@ -85,7 +89,7 @@ class DStream(object): this DStream, and then flattening the results """ def func(s, iterator): - return chain.from_iterable(imap(f, iterator)) + return chain.from_iterable(map(f, iterator)) return self.mapPartitionsWithIndex(func, preservesPartitioning) def map(self, f, preservesPartitioning=False): @@ -93,7 +97,7 @@ class DStream(object): Return a new DStream by applying a function to each element of DStream. """ def func(iterator): - return imap(f, iterator) + return map(f, iterator) return self.mapPartitions(func, preservesPartitioning) def mapPartitions(self, f, preservesPartitioning=False): @@ -150,7 +154,7 @@ class DStream(object): """ Apply a function to each RDD in this DStream. """ - if func.func_code.co_argcount == 1: + if func.__code__.co_argcount == 1: old_func = func func = lambda t, rdd: old_func(rdd) jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) @@ -165,14 +169,14 @@ class DStream(object): """ def takeAndPrint(time, rdd): taken = rdd.take(num + 1) - print "-------------------------------------------" - print "Time: %s" % time - print "-------------------------------------------" + print("-------------------------------------------") + print("Time: %s" % time) + print("-------------------------------------------") for record in taken[:num]: - print record + print(record) if len(taken) > num: - print "..." - print + print("...") + print() self.foreachRDD(takeAndPrint) @@ -181,7 +185,7 @@ class DStream(object): Return a new DStream by applying a map function to the value of each key-value pairs in this DStream without changing the key. """ - map_values_fn = lambda (k, v): (k, f(v)) + map_values_fn = lambda kv: (kv[0], f(kv[1])) return self.map(map_values_fn, preservesPartitioning=True) def flatMapValues(self, f): @@ -189,7 +193,7 @@ class DStream(object): Return a new DStream by applying a flatmap function to the value of each key-value pairs in this DStream without changing the key. """ - flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) + flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1])) return self.flatMap(flat_map_fn, preservesPartitioning=True) def glom(self): @@ -286,10 +290,10 @@ class DStream(object): `func` can have one argument of `rdd`, or have two arguments of (`time`, `rdd`) """ - if func.func_code.co_argcount == 1: + if func.__code__.co_argcount == 1: oldfunc = func func = lambda t, rdd: oldfunc(rdd) - assert func.func_code.co_argcount == 2, "func should take one or two arguments" + assert func.__code__.co_argcount == 2, "func should take one or two arguments" return TransformedDStream(self, func) def transformWith(self, func, other, keepSerializer=False): @@ -300,10 +304,10 @@ class DStream(object): `func` can have two arguments of (`rdd_a`, `rdd_b`) or have three arguments of (`time`, `rdd_a`, `rdd_b`) """ - if func.func_code.co_argcount == 2: + if func.__code__.co_argcount == 2: oldfunc = func func = lambda t, a, b: oldfunc(a, b) - assert func.func_code.co_argcount == 3, "func should take two or three arguments" + assert func.__code__.co_argcount == 3, "func should take two or three arguments" jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, other._jrdd_deserializer) dstream = self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(), other._jdstream.dstream(), jfunc) @@ -460,7 +464,7 @@ class DStream(object): keyed = self.map(lambda x: (1, x)) reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) - return reduced.map(lambda (k, v): v) + return reduced.map(lambda kv: kv[1]) def countByWindow(self, windowDuration, slideDuration): """ @@ -489,7 +493,7 @@ class DStream(object): keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, windowDuration, slideDuration, numPartitions) - return counted.filter(lambda (k, v): v > 0).count() + return counted.filter(lambda kv: kv[1] > 0).count() def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): """ @@ -548,7 +552,8 @@ class DStream(object): def invReduceFunc(t, a, b): b = b.reduceByKey(func, numPartitions) joined = a.leftOuterJoin(b, numPartitions) - return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1) + return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) + if kv[1] is not None else kv[0]) jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) if invReduceFunc: @@ -579,9 +584,9 @@ class DStream(object): g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None)) else: g = a.cogroup(b.partitionBy(numPartitions), numPartitions) - g = g.mapValues(lambda (va, vb): (list(vb), list(va)[0] if len(va) else None)) - state = g.mapValues(lambda (vs, s): updateFunc(vs, s)) - return state.filter(lambda (k, v): v is not None) + g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None)) + state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1])) + return state.filter(lambda k_v: k_v[1] is not None) jreduceFunc = TransformFunction(self._sc, reduceFunc, self._sc.serializer, self._jrdd_deserializer) diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index f083ed149e..7a7b6e1d9a 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -67,10 +67,10 @@ class KafkaUtils(object): .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") helper = helperClass.newInstance() jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel) - except Py4JJavaError, e: + except Py4JJavaError as e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): - print """ + print(""" ________________________________________________________________________________________________ Spark Streaming's Kafka libraries not found in class path. Try one of the following. @@ -88,8 +88,8 @@ ________________________________________________________________________________ ________________________________________________________________________________________________ -""" % (ssc.sparkContext.version, ssc.sparkContext.version) +""" % (ssc.sparkContext.version, ssc.sparkContext.version)) raise e ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) stream = DStream(jstream, ssc, ser) - return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1]))) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 9b4635e490..06d2215437 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -22,6 +22,7 @@ import operator import unittest import tempfile import struct +from functools import reduce from py4j.java_collections import MapConverter @@ -51,7 +52,7 @@ class PySparkStreamingTestCase(unittest.TestCase): while len(result) < n and time.time() - start_time < self.timeout: time.sleep(0.01) if len(result) < n: - print "timeout after", self.timeout + print("timeout after", self.timeout) def _take(self, dstream, n): """ @@ -131,7 +132,7 @@ class BasicOperationTests(PySparkStreamingTestCase): def func(dstream): return dstream.map(str) - expected = map(lambda x: map(str, x), input) + expected = [list(map(str, x)) for x in input] self._test_func(input, func, expected) def test_flatMap(self): @@ -140,8 +141,8 @@ class BasicOperationTests(PySparkStreamingTestCase): def func(dstream): return dstream.flatMap(lambda x: (x, x * 2)) - expected = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))), - input) + expected = [list(chain.from_iterable((map(lambda y: [y, y * 2], x)))) + for x in input] self._test_func(input, func, expected) def test_filter(self): @@ -150,7 +151,7 @@ class BasicOperationTests(PySparkStreamingTestCase): def func(dstream): return dstream.filter(lambda x: x % 2 == 0) - expected = map(lambda x: filter(lambda y: y % 2 == 0, x), input) + expected = [[y for y in x if y % 2 == 0] for x in input] self._test_func(input, func, expected) def test_count(self): @@ -159,7 +160,7 @@ class BasicOperationTests(PySparkStreamingTestCase): def func(dstream): return dstream.count() - expected = map(lambda x: [len(x)], input) + expected = [[len(x)] for x in input] self._test_func(input, func, expected) def test_reduce(self): @@ -168,7 +169,7 @@ class BasicOperationTests(PySparkStreamingTestCase): def func(dstream): return dstream.reduce(operator.add) - expected = map(lambda x: [reduce(operator.add, x)], input) + expected = [[reduce(operator.add, x)] for x in input] self._test_func(input, func, expected) def test_reduceByKey(self): @@ -185,27 +186,27 @@ class BasicOperationTests(PySparkStreamingTestCase): def test_mapValues(self): """Basic operation test for DStream.mapValues.""" input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], - [("", 4), (1, 1), (2, 2), (3, 3)], + [(0, 4), (1, 1), (2, 2), (3, 3)], [(1, 1), (2, 1), (3, 1), (4, 1)]] def func(dstream): return dstream.mapValues(lambda x: x + 10) expected = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)], - [("", 14), (1, 11), (2, 12), (3, 13)], + [(0, 14), (1, 11), (2, 12), (3, 13)], [(1, 11), (2, 11), (3, 11), (4, 11)]] self._test_func(input, func, expected, sort=True) def test_flatMapValues(self): """Basic operation test for DStream.flatMapValues.""" input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], - [("", 4), (1, 1), (2, 1), (3, 1)], + [(0, 4), (1, 1), (2, 1), (3, 1)], [(1, 1), (2, 1), (3, 1), (4, 1)]] def func(dstream): return dstream.flatMapValues(lambda x: (x, x + 10)) expected = [[("a", 2), ("a", 12), ("b", 2), ("b", 12), ("c", 1), ("c", 11), ("d", 1), ("d", 11)], - [("", 4), ("", 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)], + [(0, 4), (0, 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)], [(1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11), (4, 1), (4, 11)]] self._test_func(input, func, expected) @@ -233,7 +234,7 @@ class BasicOperationTests(PySparkStreamingTestCase): def test_countByValue(self): """Basic operation test for DStream.countByValue.""" - input = [range(1, 5) * 2, range(5, 7) + range(5, 9), ["a", "a", "b", ""]] + input = [list(range(1, 5)) * 2, list(range(5, 7)) + list(range(5, 9)), ["a", "a", "b", ""]] def func(dstream): return dstream.countByValue() @@ -285,7 +286,7 @@ class BasicOperationTests(PySparkStreamingTestCase): def func(d1, d2): return d1.union(d2) - expected = [range(6), range(6), range(6)] + expected = [list(range(6)), list(range(6)), list(range(6))] self._test_func(input1, func, expected, input2=input2) def test_cogroup(self): @@ -424,7 +425,7 @@ class StreamingContextTests(PySparkStreamingTestCase): duration = 0.1 def _add_input_stream(self): - inputs = map(lambda x: range(1, x), range(101)) + inputs = [range(1, x) for x in range(101)] stream = self.ssc.queueStream(inputs) self._collect(stream, 1, block=False) @@ -441,7 +442,7 @@ class StreamingContextTests(PySparkStreamingTestCase): self.ssc.stop() def test_queue_stream(self): - input = [range(i + 1) for i in range(3)] + input = [list(range(i + 1)) for i in range(3)] dstream = self.ssc.queueStream(input) result = self._collect(dstream, 3) self.assertEqual(input, result) @@ -457,13 +458,13 @@ class StreamingContextTests(PySparkStreamingTestCase): with open(os.path.join(d, name), "w") as f: f.writelines(["%d\n" % i for i in range(10)]) self.wait_for(result, 2) - self.assertEqual([range(10), range(10)], result) + self.assertEqual([list(range(10)), list(range(10))], result) def test_binary_records_stream(self): d = tempfile.mkdtemp() self.ssc = StreamingContext(self.sc, self.duration) dstream = self.ssc.binaryRecordsStream(d, 10).map( - lambda v: struct.unpack("10b", str(v))) + lambda v: struct.unpack("10b", bytes(v))) result = self._collect(dstream, 2, block=False) self.ssc.start() for name in ('a', 'b'): @@ -471,10 +472,10 @@ class StreamingContextTests(PySparkStreamingTestCase): with open(os.path.join(d, name), "wb") as f: f.write(bytearray(range(10))) self.wait_for(result, 2) - self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result)) + self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result]) def test_union(self): - input = [range(i + 1) for i in range(3)] + input = [list(range(i + 1)) for i in range(3)] dstream = self.ssc.queueStream(input) dstream2 = self.ssc.queueStream(input) dstream3 = self.ssc.union(dstream, dstream2) diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index 86ee5aa04f..34291f30a5 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -91,9 +91,9 @@ class TransformFunctionSerializer(object): except Exception: traceback.print_exc() - def loads(self, bytes): + def loads(self, data): try: - f, deserializers = self.serializer.loads(str(bytes)) + f, deserializers = self.serializer.loads(bytes(data)) return TransformFunction(self.ctx, f, *deserializers) except Exception: traceback.print_exc() @@ -116,7 +116,7 @@ def rddToFileName(prefix, suffix, timestamp): """ if isinstance(timestamp, datetime): seconds = time.mktime(timestamp.timetuple()) - timestamp = long(seconds * 1000) + timestamp.microsecond / 1000 + timestamp = int(seconds * 1000) + timestamp.microsecond // 1000 if suffix is None: return prefix + "-" + str(timestamp) else: diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ee67e80d53..75f39d9e75 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -19,8 +19,8 @@ Unit tests for PySpark; additional tests are implemented as doctests in individual modules. """ + from array import array -from fileinput import input from glob import glob import os import re @@ -45,6 +45,9 @@ if sys.version_info[:2] <= (2, 6): sys.exit(1) else: import unittest + if sys.version_info[0] >= 3: + xrange = range + basestring = str from pyspark.conf import SparkConf @@ -52,7 +55,9 @@ from pyspark.context import SparkContext from pyspark.rdd import RDD from pyspark.files import SparkFiles from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer, \ - CloudPickleSerializer, CompressedSerializer, UTF8Deserializer, NoOpSerializer + CloudPickleSerializer, CompressedSerializer, UTF8Deserializer, NoOpSerializer, \ + PairDeserializer, CartesianDeserializer, AutoBatchedSerializer, AutoSerializer, \ + FlattenedValuesSerializer from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter from pyspark import shuffle from pyspark.profiler import BasicProfiler @@ -81,7 +86,7 @@ class MergerTests(unittest.TestCase): def setUp(self): self.N = 1 << 12 self.l = [i for i in xrange(self.N)] - self.data = zip(self.l, self.l) + self.data = list(zip(self.l, self.l)) self.agg = Aggregator(lambda x: [x], lambda x, y: x.append(y) or x, lambda x, y: x.extend(y) or x) @@ -89,45 +94,45 @@ class MergerTests(unittest.TestCase): def test_in_memory(self): m = InMemoryMerger(self.agg) m.mergeValues(self.data) - self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + self.assertEqual(sum(sum(v) for k, v in m.items()), sum(xrange(self.N))) m = InMemoryMerger(self.agg) - m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data)) - self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + m.mergeCombiners(map(lambda x_y: (x_y[0], [x_y[1]]), self.data)) + self.assertEqual(sum(sum(v) for k, v in m.items()), sum(xrange(self.N))) def test_small_dataset(self): m = ExternalMerger(self.agg, 1000) m.mergeValues(self.data) self.assertEqual(m.spills, 0) - self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + self.assertEqual(sum(sum(v) for k, v in m.items()), sum(xrange(self.N))) m = ExternalMerger(self.agg, 1000) - m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data)) + m.mergeCombiners(map(lambda x_y1: (x_y1[0], [x_y1[1]]), self.data)) self.assertEqual(m.spills, 0) - self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + self.assertEqual(sum(sum(v) for k, v in m.items()), sum(xrange(self.N))) def test_medium_dataset(self): - m = ExternalMerger(self.agg, 30) + m = ExternalMerger(self.agg, 20) m.mergeValues(self.data) self.assertTrue(m.spills >= 1) - self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + self.assertEqual(sum(sum(v) for k, v in m.items()), sum(xrange(self.N))) m = ExternalMerger(self.agg, 10) - m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data * 3)) + m.mergeCombiners(map(lambda x_y2: (x_y2[0], [x_y2[1]]), self.data * 3)) self.assertTrue(m.spills >= 1) - self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + self.assertEqual(sum(sum(v) for k, v in m.items()), sum(xrange(self.N)) * 3) def test_huge_dataset(self): - m = ExternalMerger(self.agg, 10, partitions=3) - m.mergeCombiners(map(lambda (k, v): (k, [str(v)]), self.data * 10)) + m = ExternalMerger(self.agg, 5, partitions=3) + m.mergeCombiners(map(lambda k_v: (k_v[0], [str(k_v[1])]), self.data * 10)) self.assertTrue(m.spills >= 1) - self.assertEqual(sum(len(v) for k, v in m.iteritems()), + self.assertEqual(sum(len(v) for k, v in m.items()), self.N * 10) m._cleanup() @@ -144,55 +149,55 @@ class MergerTests(unittest.TestCase): self.assertEqual(1, len(list(gen_gs(1)))) self.assertEqual(2, len(list(gen_gs(2)))) self.assertEqual(100, len(list(gen_gs(100)))) - self.assertEqual(range(1, 101), [k for k, _ in gen_gs(100)]) - self.assertTrue(all(range(k) == list(vs) for k, vs in gen_gs(100))) + self.assertEqual(list(range(1, 101)), [k for k, _ in gen_gs(100)]) + self.assertTrue(all(list(range(k)) == list(vs) for k, vs in gen_gs(100))) for k, vs in gen_gs(50002, 10000): self.assertEqual(k, len(vs)) - self.assertEqual(range(k), list(vs)) + self.assertEqual(list(range(k)), list(vs)) ser = PickleSerializer() l = ser.loads(ser.dumps(list(gen_gs(50002, 30000)))) for k, vs in l: self.assertEqual(k, len(vs)) - self.assertEqual(range(k), list(vs)) + self.assertEqual(list(range(k)), list(vs)) class SorterTests(unittest.TestCase): def test_in_memory_sort(self): - l = range(1024) + l = list(range(1024)) random.shuffle(l) sorter = ExternalSorter(1024) - self.assertEquals(sorted(l), list(sorter.sorted(l))) - self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True))) - self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x))) - self.assertEquals(sorted(l, key=lambda x: -x, reverse=True), - list(sorter.sorted(l, key=lambda x: -x, reverse=True))) + self.assertEqual(sorted(l), list(sorter.sorted(l))) + self.assertEqual(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True))) + self.assertEqual(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x))) + self.assertEqual(sorted(l, key=lambda x: -x, reverse=True), + list(sorter.sorted(l, key=lambda x: -x, reverse=True))) def test_external_sort(self): - l = range(1024) + l = list(range(1024)) random.shuffle(l) sorter = ExternalSorter(1) - self.assertEquals(sorted(l), list(sorter.sorted(l))) + self.assertEqual(sorted(l), list(sorter.sorted(l))) self.assertGreater(shuffle.DiskBytesSpilled, 0) last = shuffle.DiskBytesSpilled - self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True))) + self.assertEqual(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True))) self.assertGreater(shuffle.DiskBytesSpilled, last) last = shuffle.DiskBytesSpilled - self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x))) + self.assertEqual(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x))) self.assertGreater(shuffle.DiskBytesSpilled, last) last = shuffle.DiskBytesSpilled - self.assertEquals(sorted(l, key=lambda x: -x, reverse=True), - list(sorter.sorted(l, key=lambda x: -x, reverse=True))) + self.assertEqual(sorted(l, key=lambda x: -x, reverse=True), + list(sorter.sorted(l, key=lambda x: -x, reverse=True))) self.assertGreater(shuffle.DiskBytesSpilled, last) def test_external_sort_in_rdd(self): conf = SparkConf().set("spark.python.worker.memory", "1m") sc = SparkContext(conf=conf) - l = range(10240) + l = list(range(10240)) random.shuffle(l) - rdd = sc.parallelize(l, 10) - self.assertEquals(sorted(l), rdd.sortBy(lambda x: x).collect()) + rdd = sc.parallelize(l, 2) + self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect()) sc.stop() @@ -200,11 +205,11 @@ class SerializationTestCase(unittest.TestCase): def test_namedtuple(self): from collections import namedtuple - from cPickle import dumps, loads + from pickle import dumps, loads P = namedtuple("P", "x y") p1 = P(1, 3) p2 = loads(dumps(p1, 2)) - self.assertEquals(p1, p2) + self.assertEqual(p1, p2) def test_itemgetter(self): from operator import itemgetter @@ -246,7 +251,7 @@ class SerializationTestCase(unittest.TestCase): ser = CloudPickleSerializer() out1 = sys.stderr out2 = ser.loads(ser.dumps(out1)) - self.assertEquals(out1, out2) + self.assertEqual(out1, out2) def test_func_globals(self): @@ -263,19 +268,36 @@ class SerializationTestCase(unittest.TestCase): def foo(): sys.exit(0) - self.assertTrue("exit" in foo.func_code.co_names) + self.assertTrue("exit" in foo.__code__.co_names) ser.dumps(foo) def test_compressed_serializer(self): ser = CompressedSerializer(PickleSerializer()) - from StringIO import StringIO + try: + from StringIO import StringIO + except ImportError: + from io import BytesIO as StringIO io = StringIO() ser.dump_stream(["abc", u"123", range(5)], io) io.seek(0) self.assertEqual(["abc", u"123", range(5)], list(ser.load_stream(io))) ser.dump_stream(range(1000), io) io.seek(0) - self.assertEqual(["abc", u"123", range(5)] + range(1000), list(ser.load_stream(io))) + self.assertEqual(["abc", u"123", range(5)] + list(range(1000)), list(ser.load_stream(io))) + io.close() + + def test_hash_serializer(self): + hash(NoOpSerializer()) + hash(UTF8Deserializer()) + hash(PickleSerializer()) + hash(MarshalSerializer()) + hash(AutoSerializer()) + hash(BatchedSerializer(PickleSerializer())) + hash(AutoBatchedSerializer(MarshalSerializer())) + hash(PairDeserializer(NoOpSerializer(), UTF8Deserializer())) + hash(CartesianDeserializer(NoOpSerializer(), UTF8Deserializer())) + hash(CompressedSerializer(PickleSerializer())) + hash(FlattenedValuesSerializer(PickleSerializer())) class PySparkTestCase(unittest.TestCase): @@ -340,7 +362,7 @@ class CheckpointTests(ReusedPySparkTestCase): self.assertTrue(flatMappedRDD.getCheckpointFile() is not None) recovered = self.sc._checkpointFile(flatMappedRDD.getCheckpointFile(), flatMappedRDD._jrdd_deserializer) - self.assertEquals([1, 2, 3, 4], recovered.collect()) + self.assertEqual([1, 2, 3, 4], recovered.collect()) class AddFileTests(PySparkTestCase): @@ -356,8 +378,7 @@ class AddFileTests(PySparkTestCase): def func(x): from userlibrary import UserClass return UserClass().hello() - self.assertRaises(Exception, - self.sc.parallelize(range(2)).map(func).first) + self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first) log4j.LogManager.getRootLogger().setLevel(old_level) # Add the file, so the job should now succeed: @@ -372,7 +393,7 @@ class AddFileTests(PySparkTestCase): download_path = SparkFiles.get("hello.txt") self.assertNotEqual(path, download_path) with open(download_path) as test_file: - self.assertEquals("Hello World!\n", test_file.readline()) + self.assertEqual("Hello World!\n", test_file.readline()) def test_add_py_file_locally(self): # To ensure that we're actually testing addPyFile's effects, check that @@ -381,7 +402,7 @@ class AddFileTests(PySparkTestCase): from userlibrary import UserClass self.assertRaises(ImportError, func) path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py") - self.sc.addFile(path) + self.sc.addPyFile(path) from userlibrary import UserClass self.assertEqual("Hello World!", UserClass().hello()) @@ -391,7 +412,7 @@ class AddFileTests(PySparkTestCase): def func(): from userlib import UserClass self.assertRaises(ImportError, func) - path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg") + path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1.zip") self.sc.addPyFile(path) from userlib import UserClass self.assertEqual("Hello World from inside a package!", UserClass().hello()) @@ -427,8 +448,9 @@ class RDDTests(ReusedPySparkTestCase): tempFile = tempfile.NamedTemporaryFile(delete=True) tempFile.close() data.saveAsTextFile(tempFile.name) - raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*"))) - self.assertEqual(x, unicode(raw_contents.strip(), "utf-8")) + raw_contents = b''.join(open(p, 'rb').read() + for p in glob(tempFile.name + "/part-0000*")) + self.assertEqual(x, raw_contents.strip().decode("utf-8")) def test_save_as_textfile_with_utf8(self): x = u"\u00A1Hola, mundo!" @@ -436,19 +458,20 @@ class RDDTests(ReusedPySparkTestCase): tempFile = tempfile.NamedTemporaryFile(delete=True) tempFile.close() data.saveAsTextFile(tempFile.name) - raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*"))) - self.assertEqual(x, unicode(raw_contents.strip(), "utf-8")) + raw_contents = b''.join(open(p, 'rb').read() + for p in glob(tempFile.name + "/part-0000*")) + self.assertEqual(x, raw_contents.strip().decode('utf8')) def test_transforming_cartesian_result(self): # Regression test for SPARK-1034 rdd1 = self.sc.parallelize([1, 2]) rdd2 = self.sc.parallelize([3, 4]) cart = rdd1.cartesian(rdd2) - result = cart.map(lambda (x, y): x + y).collect() + result = cart.map(lambda x_y3: x_y3[0] + x_y3[1]).collect() def test_transforming_pickle_file(self): # Regression test for SPARK-2601 - data = self.sc.parallelize(["Hello", "World!"]) + data = self.sc.parallelize([u"Hello", u"World!"]) tempFile = tempfile.NamedTemporaryFile(delete=True) tempFile.close() data.saveAsPickleFile(tempFile.name) @@ -461,13 +484,13 @@ class RDDTests(ReusedPySparkTestCase): a = self.sc.textFile(path) result = a.cartesian(a).collect() (x, y) = result[0] - self.assertEqual("Hello World!", x.strip()) - self.assertEqual("Hello World!", y.strip()) + self.assertEqual(u"Hello World!", x.strip()) + self.assertEqual(u"Hello World!", y.strip()) def test_deleting_input_files(self): # Regression test for SPARK-1025 tempFile = tempfile.NamedTemporaryFile(delete=False) - tempFile.write("Hello World!") + tempFile.write(b"Hello World!") tempFile.close() data = self.sc.textFile(tempFile.name) filtered_data = data.filter(lambda x: True) @@ -510,21 +533,21 @@ class RDDTests(ReusedPySparkTestCase): jon = Person(1, "Jon", "Doe") jane = Person(2, "Jane", "Doe") theDoes = self.sc.parallelize([jon, jane]) - self.assertEquals([jon, jane], theDoes.collect()) + self.assertEqual([jon, jane], theDoes.collect()) def test_large_broadcast(self): N = 100000 data = [[float(i) for i in range(300)] for i in range(N)] bdata = self.sc.broadcast(data) # 270MB m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum() - self.assertEquals(N, m) + self.assertEqual(N, m) def test_multiple_broadcasts(self): N = 1 << 21 b1 = self.sc.broadcast(set(range(N))) # multiple blocks in JVM - r = range(1 << 15) + r = list(range(1 << 15)) random.shuffle(r) - s = str(r) + s = str(r).encode() checksum = hashlib.md5(s).hexdigest() b2 = self.sc.broadcast(s) r = list(set(self.sc.parallelize(range(10), 10).map( @@ -535,7 +558,7 @@ class RDDTests(ReusedPySparkTestCase): self.assertEqual(checksum, csum) random.shuffle(r) - s = str(r) + s = str(r).encode() checksum = hashlib.md5(s).hexdigest() b2 = self.sc.broadcast(s) r = list(set(self.sc.parallelize(range(10), 10).map( @@ -549,7 +572,7 @@ class RDDTests(ReusedPySparkTestCase): N = 1000000 data = [float(i) for i in xrange(N)] rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data)) - self.assertEquals(N, rdd.first()) + self.assertEqual(N, rdd.first()) # regression test for SPARK-6886 self.assertEqual(1, rdd.map(lambda x: (x, 1)).groupByKey().count()) @@ -590,15 +613,15 @@ class RDDTests(ReusedPySparkTestCase): # same total number of items, but different distributions a = self.sc.parallelize([2, 3], 2).flatMap(range) b = self.sc.parallelize([3, 2], 2).flatMap(range) - self.assertEquals(a.count(), b.count()) + self.assertEqual(a.count(), b.count()) self.assertRaises(Exception, lambda: a.zip(b).count()) def test_count_approx_distinct(self): rdd = self.sc.parallelize(range(1000)) - self.assertTrue(950 < rdd.countApproxDistinct(0.04) < 1050) - self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.04) < 1050) - self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.04) < 1050) - self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.04) < 1050) + self.assertTrue(950 < rdd.countApproxDistinct(0.03) < 1050) + self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.03) < 1050) + self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.03) < 1050) + self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.03) < 1050) rdd = self.sc.parallelize([i % 20 for i in range(1000)], 7) self.assertTrue(18 < rdd.countApproxDistinct() < 22) @@ -612,59 +635,59 @@ class RDDTests(ReusedPySparkTestCase): def test_histogram(self): # empty rdd = self.sc.parallelize([]) - self.assertEquals([0], rdd.histogram([0, 10])[1]) - self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1]) + self.assertEqual([0], rdd.histogram([0, 10])[1]) + self.assertEqual([0, 0], rdd.histogram([0, 4, 10])[1]) self.assertRaises(ValueError, lambda: rdd.histogram(1)) # out of range rdd = self.sc.parallelize([10.01, -0.01]) - self.assertEquals([0], rdd.histogram([0, 10])[1]) - self.assertEquals([0, 0], rdd.histogram((0, 4, 10))[1]) + self.assertEqual([0], rdd.histogram([0, 10])[1]) + self.assertEqual([0, 0], rdd.histogram((0, 4, 10))[1]) # in range with one bucket rdd = self.sc.parallelize(range(1, 5)) - self.assertEquals([4], rdd.histogram([0, 10])[1]) - self.assertEquals([3, 1], rdd.histogram([0, 4, 10])[1]) + self.assertEqual([4], rdd.histogram([0, 10])[1]) + self.assertEqual([3, 1], rdd.histogram([0, 4, 10])[1]) # in range with one bucket exact match - self.assertEquals([4], rdd.histogram([1, 4])[1]) + self.assertEqual([4], rdd.histogram([1, 4])[1]) # out of range with two buckets rdd = self.sc.parallelize([10.01, -0.01]) - self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1]) + self.assertEqual([0, 0], rdd.histogram([0, 5, 10])[1]) # out of range with two uneven buckets rdd = self.sc.parallelize([10.01, -0.01]) - self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1]) + self.assertEqual([0, 0], rdd.histogram([0, 4, 10])[1]) # in range with two buckets rdd = self.sc.parallelize([1, 2, 3, 5, 6]) - self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1]) + self.assertEqual([3, 2], rdd.histogram([0, 5, 10])[1]) # in range with two bucket and None rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')]) - self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1]) + self.assertEqual([3, 2], rdd.histogram([0, 5, 10])[1]) # in range with two uneven buckets rdd = self.sc.parallelize([1, 2, 3, 5, 6]) - self.assertEquals([3, 2], rdd.histogram([0, 5, 11])[1]) + self.assertEqual([3, 2], rdd.histogram([0, 5, 11])[1]) # mixed range with two uneven buckets rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01]) - self.assertEquals([4, 3], rdd.histogram([0, 5, 11])[1]) + self.assertEqual([4, 3], rdd.histogram([0, 5, 11])[1]) # mixed range with four uneven buckets rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1]) - self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) + self.assertEqual([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) # mixed range with uneven buckets and NaN rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1, None, float('nan')]) - self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) + self.assertEqual([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) # out of range with infinite buckets rdd = self.sc.parallelize([10.01, -0.01, float('nan'), float("inf")]) - self.assertEquals([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1]) + self.assertEqual([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1]) # invalid buckets self.assertRaises(ValueError, lambda: rdd.histogram([])) @@ -674,25 +697,25 @@ class RDDTests(ReusedPySparkTestCase): # without buckets rdd = self.sc.parallelize(range(1, 5)) - self.assertEquals(([1, 4], [4]), rdd.histogram(1)) + self.assertEqual(([1, 4], [4]), rdd.histogram(1)) # without buckets single element rdd = self.sc.parallelize([1]) - self.assertEquals(([1, 1], [1]), rdd.histogram(1)) + self.assertEqual(([1, 1], [1]), rdd.histogram(1)) # without bucket no range rdd = self.sc.parallelize([1] * 4) - self.assertEquals(([1, 1], [4]), rdd.histogram(1)) + self.assertEqual(([1, 1], [4]), rdd.histogram(1)) # without buckets basic two rdd = self.sc.parallelize(range(1, 5)) - self.assertEquals(([1, 2.5, 4], [2, 2]), rdd.histogram(2)) + self.assertEqual(([1, 2.5, 4], [2, 2]), rdd.histogram(2)) # without buckets with more requested than elements rdd = self.sc.parallelize([1, 2]) buckets = [1 + 0.2 * i for i in range(6)] hist = [1, 0, 0, 0, 1] - self.assertEquals((buckets, hist), rdd.histogram(5)) + self.assertEqual((buckets, hist), rdd.histogram(5)) # invalid RDDs rdd = self.sc.parallelize([1, float('inf')]) @@ -702,15 +725,8 @@ class RDDTests(ReusedPySparkTestCase): # string rdd = self.sc.parallelize(["ab", "ac", "b", "bd", "ef"], 2) - self.assertEquals([2, 2], rdd.histogram(["a", "b", "c"])[1]) - self.assertEquals((["ab", "ef"], [5]), rdd.histogram(1)) - self.assertRaises(TypeError, lambda: rdd.histogram(2)) - - # mixed RDD - rdd = self.sc.parallelize([1, 4, "ab", "ac", "b"], 2) - self.assertEquals([1, 1], rdd.histogram([0, 4, 10])[1]) - self.assertEquals([2, 1], rdd.histogram(["a", "b", "c"])[1]) - self.assertEquals(([1, "b"], [5]), rdd.histogram(1)) + self.assertEqual([2, 2], rdd.histogram(["a", "b", "c"])[1]) + self.assertEqual((["ab", "ef"], [5]), rdd.histogram(1)) self.assertRaises(TypeError, lambda: rdd.histogram(2)) def test_repartitionAndSortWithinPartitions(self): @@ -718,31 +734,31 @@ class RDDTests(ReusedPySparkTestCase): repartitioned = rdd.repartitionAndSortWithinPartitions(2, lambda key: key % 2) partitions = repartitioned.glom().collect() - self.assertEquals(partitions[0], [(0, 5), (0, 8), (2, 6)]) - self.assertEquals(partitions[1], [(1, 3), (3, 8), (3, 8)]) + self.assertEqual(partitions[0], [(0, 5), (0, 8), (2, 6)]) + self.assertEqual(partitions[1], [(1, 3), (3, 8), (3, 8)]) def test_distinct(self): rdd = self.sc.parallelize((1, 2, 3)*10, 10) - self.assertEquals(rdd.getNumPartitions(), 10) - self.assertEquals(rdd.distinct().count(), 3) + self.assertEqual(rdd.getNumPartitions(), 10) + self.assertEqual(rdd.distinct().count(), 3) result = rdd.distinct(5) - self.assertEquals(result.getNumPartitions(), 5) - self.assertEquals(result.count(), 3) + self.assertEqual(result.getNumPartitions(), 5) + self.assertEqual(result.count(), 3) def test_external_group_by_key(self): - self.sc._conf.set("spark.python.worker.memory", "5m") + self.sc._conf.set("spark.python.worker.memory", "1m") N = 200001 kv = self.sc.parallelize(range(N)).map(lambda x: (x % 3, x)) gkv = kv.groupByKey().cache() self.assertEqual(3, gkv.count()) - filtered = gkv.filter(lambda (k, vs): k == 1) + filtered = gkv.filter(lambda kv: kv[0] == 1) self.assertEqual(1, filtered.count()) - self.assertEqual([(1, N/3)], filtered.mapValues(len).collect()) - self.assertEqual([(N/3, N/3)], + self.assertEqual([(1, N // 3)], filtered.mapValues(len).collect()) + self.assertEqual([(N // 3, N // 3)], filtered.values().map(lambda x: (len(x), len(list(x)))).collect()) result = filtered.collect()[0][1] - self.assertEqual(N/3, len(result)) - self.assertTrue(isinstance(result.data, shuffle.ExternalList)) + self.assertEqual(N // 3, len(result)) + self.assertTrue(isinstance(result.data, shuffle.ExternalListOfList)) def test_sort_on_empty_rdd(self): self.assertEqual([], self.sc.parallelize(zip([], [])).sortByKey().collect()) @@ -767,7 +783,7 @@ class RDDTests(ReusedPySparkTestCase): rdd = RDD(jrdd, self.sc, UTF8Deserializer()) self.assertEqual([u"a", None, u"b"], rdd.collect()) rdd = RDD(jrdd, self.sc, NoOpSerializer()) - self.assertEqual(["a", None, "b"], rdd.collect()) + self.assertEqual([b"a", None, b"b"], rdd.collect()) def test_multiple_python_java_RDD_conversions(self): # Regression test for SPARK-5361 @@ -813,14 +829,14 @@ class RDDTests(ReusedPySparkTestCase): self.sc.setJobGroup("test3", "test", True) d = sorted(parted.cogroup(parted).collect()) self.assertEqual(10, len(d)) - self.assertEqual([[0], [0]], map(list, d[0][1])) + self.assertEqual([[0], [0]], list(map(list, d[0][1]))) jobId = tracker.getJobIdsForGroup("test3")[0] self.assertEqual(2, len(tracker.getJobInfo(jobId).stageIds)) self.sc.setJobGroup("test4", "test", True) d = sorted(parted.cogroup(rdd).collect()) self.assertEqual(10, len(d)) - self.assertEqual([[0], [0]], map(list, d[0][1])) + self.assertEqual([[0], [0]], list(map(list, d[0][1]))) jobId = tracker.getJobIdsForGroup("test4")[0] self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds)) @@ -906,6 +922,7 @@ class InputFormatTests(ReusedPySparkTestCase): ReusedPySparkTestCase.tearDownClass() shutil.rmtree(cls.tempdir.name) + @unittest.skipIf(sys.version >= "3", "serialize array of byte") def test_sequencefiles(self): basepath = self.tempdir.name ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/", @@ -954,15 +971,16 @@ class InputFormatTests(ReusedPySparkTestCase): en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)] self.assertEqual(nulls, en) - maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/", - "org.apache.hadoop.io.IntWritable", - "org.apache.hadoop.io.MapWritable").collect()) + maps = self.sc.sequenceFile(basepath + "/sftestdata/sfmap/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable").collect() em = [(1, {}), (1, {3.0: u'bb'}), (2, {1.0: u'aa'}), (2, {1.0: u'cc'}), (3, {2.0: u'dd'})] - self.assertEqual(maps, em) + for v in maps: + self.assertTrue(v in em) # arrays get pickled to tuples by default tuples = sorted(self.sc.sequenceFile( @@ -1089,8 +1107,8 @@ class InputFormatTests(ReusedPySparkTestCase): def test_binary_files(self): path = os.path.join(self.tempdir.name, "binaryfiles") os.mkdir(path) - data = "short binary data" - with open(os.path.join(path, "part-0000"), 'w') as f: + data = b"short binary data" + with open(os.path.join(path, "part-0000"), 'wb') as f: f.write(data) [(p, d)] = self.sc.binaryFiles(path).collect() self.assertTrue(p.endswith("part-0000")) @@ -1103,7 +1121,7 @@ class InputFormatTests(ReusedPySparkTestCase): for i in range(100): f.write('%04d' % i) result = self.sc.binaryRecords(path, 4).map(int).collect() - self.assertEqual(range(100), result) + self.assertEqual(list(range(100)), result) class OutputFormatTests(ReusedPySparkTestCase): @@ -1115,6 +1133,7 @@ class OutputFormatTests(ReusedPySparkTestCase): def tearDown(self): shutil.rmtree(self.tempdir.name, ignore_errors=True) + @unittest.skipIf(sys.version >= "3", "serialize array of byte") def test_sequencefiles(self): basepath = self.tempdir.name ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] @@ -1155,8 +1174,9 @@ class OutputFormatTests(ReusedPySparkTestCase): (2, {1.0: u'cc'}), (3, {2.0: u'dd'})] self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/") - maps = sorted(self.sc.sequenceFile(basepath + "/sfmap/").collect()) - self.assertEqual(maps, em) + maps = self.sc.sequenceFile(basepath + "/sfmap/").collect() + for v in maps: + self.assertTrue(v, em) def test_oldhadoop(self): basepath = self.tempdir.name @@ -1168,12 +1188,13 @@ class OutputFormatTests(ReusedPySparkTestCase): "org.apache.hadoop.mapred.SequenceFileOutputFormat", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.MapWritable") - result = sorted(self.sc.hadoopFile( + result = self.sc.hadoopFile( basepath + "/oldhadoop/", "org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", - "org.apache.hadoop.io.MapWritable").collect()) - self.assertEqual(result, dict_data) + "org.apache.hadoop.io.MapWritable").collect() + for v in result: + self.assertTrue(v, dict_data) conf = { "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat", @@ -1183,12 +1204,13 @@ class OutputFormatTests(ReusedPySparkTestCase): } self.sc.parallelize(dict_data).saveAsHadoopDataset(conf) input_conf = {"mapred.input.dir": basepath + "/olddataset/"} - old_dataset = sorted(self.sc.hadoopRDD( + result = self.sc.hadoopRDD( "org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.MapWritable", - conf=input_conf).collect()) - self.assertEqual(old_dataset, dict_data) + conf=input_conf).collect() + for v in result: + self.assertTrue(v, dict_data) def test_newhadoop(self): basepath = self.tempdir.name @@ -1223,6 +1245,7 @@ class OutputFormatTests(ReusedPySparkTestCase): conf=input_conf).collect()) self.assertEqual(new_dataset, data) + @unittest.skipIf(sys.version >= "3", "serialize of array") def test_newhadoop_with_array(self): basepath = self.tempdir.name # use custom ArrayWritable types and converters to handle arrays @@ -1303,7 +1326,7 @@ class OutputFormatTests(ReusedPySparkTestCase): basepath = self.tempdir.name x = range(1, 5) y = range(1001, 1005) - data = zip(x, y) + data = list(zip(x, y)) rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y)) rdd.saveAsSequenceFile(basepath + "/reserialize/sequence") result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect()) @@ -1354,7 +1377,7 @@ class DaemonTests(unittest.TestCase): sock = socket(AF_INET, SOCK_STREAM) sock.connect(('127.0.0.1', port)) # send a split index of -1 to shutdown the worker - sock.send("\xFF\xFF\xFF\xFF") + sock.send(b"\xFF\xFF\xFF\xFF") sock.close() return True @@ -1395,7 +1418,6 @@ class DaemonTests(unittest.TestCase): class WorkerTests(PySparkTestCase): - def test_cancel_task(self): temp = tempfile.NamedTemporaryFile(delete=True) temp.close() @@ -1410,7 +1432,7 @@ class WorkerTests(PySparkTestCase): # start job in background thread def run(): - self.sc.parallelize(range(1)).foreach(sleep) + self.sc.parallelize(range(1), 1).foreach(sleep) import threading t = threading.Thread(target=run) t.daemon = True @@ -1419,7 +1441,8 @@ class WorkerTests(PySparkTestCase): daemon_pid, worker_pid = 0, 0 while True: if os.path.exists(path): - data = open(path).read().split(' ') + with open(path) as f: + data = f.read().split(' ') daemon_pid, worker_pid = map(int, data) break time.sleep(0.1) @@ -1455,7 +1478,7 @@ class WorkerTests(PySparkTestCase): def test_after_jvm_exception(self): tempFile = tempfile.NamedTemporaryFile(delete=False) - tempFile.write("Hello World!") + tempFile.write(b"Hello World!") tempFile.close() data = self.sc.textFile(tempFile.name, 1) filtered_data = data.filter(lambda x: True) @@ -1577,12 +1600,12 @@ class SparkSubmitTests(unittest.TestCase): |from pyspark import SparkContext | |sc = SparkContext() - |print sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect() + |print(sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect()) """) proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) - self.assertIn("[2, 4, 6]", out) + self.assertIn("[2, 4, 6]", out.decode('utf-8')) def test_script_with_local_functions(self): """Submit and test a single script file calling a global function""" @@ -1593,12 +1616,12 @@ class SparkSubmitTests(unittest.TestCase): | return x * 3 | |sc = SparkContext() - |print sc.parallelize([1, 2, 3]).map(foo).collect() + |print(sc.parallelize([1, 2, 3]).map(foo).collect()) """) proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) - self.assertIn("[3, 6, 9]", out) + self.assertIn("[3, 6, 9]", out.decode('utf-8')) def test_module_dependency(self): """Submit and test a script with a dependency on another module""" @@ -1607,7 +1630,7 @@ class SparkSubmitTests(unittest.TestCase): |from mylib import myfunc | |sc = SparkContext() - |print sc.parallelize([1, 2, 3]).map(myfunc).collect() + |print(sc.parallelize([1, 2, 3]).map(myfunc).collect()) """) zip = self.createFileInZip("mylib.py", """ |def myfunc(x): @@ -1617,7 +1640,7 @@ class SparkSubmitTests(unittest.TestCase): stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) - self.assertIn("[2, 3, 4]", out) + self.assertIn("[2, 3, 4]", out.decode('utf-8')) def test_module_dependency_on_cluster(self): """Submit and test a script with a dependency on another module on a cluster""" @@ -1626,7 +1649,7 @@ class SparkSubmitTests(unittest.TestCase): |from mylib import myfunc | |sc = SparkContext() - |print sc.parallelize([1, 2, 3]).map(myfunc).collect() + |print(sc.parallelize([1, 2, 3]).map(myfunc).collect()) """) zip = self.createFileInZip("mylib.py", """ |def myfunc(x): @@ -1637,7 +1660,7 @@ class SparkSubmitTests(unittest.TestCase): stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) - self.assertIn("[2, 3, 4]", out) + self.assertIn("[2, 3, 4]", out.decode('utf-8')) def test_package_dependency(self): """Submit and test a script with a dependency on a Spark Package""" @@ -1646,14 +1669,14 @@ class SparkSubmitTests(unittest.TestCase): |from mylib import myfunc | |sc = SparkContext() - |print sc.parallelize([1, 2, 3]).map(myfunc).collect() + |print(sc.parallelize([1, 2, 3]).map(myfunc).collect()) """) self.create_spark_package("a:mylib:0.1") proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories", "file:" + self.programDir, script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) - self.assertIn("[2, 3, 4]", out) + self.assertIn("[2, 3, 4]", out.decode('utf-8')) def test_package_dependency_on_cluster(self): """Submit and test a script with a dependency on a Spark Package on a cluster""" @@ -1662,7 +1685,7 @@ class SparkSubmitTests(unittest.TestCase): |from mylib import myfunc | |sc = SparkContext() - |print sc.parallelize([1, 2, 3]).map(myfunc).collect() + |print(sc.parallelize([1, 2, 3]).map(myfunc).collect()) """) self.create_spark_package("a:mylib:0.1") proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories", @@ -1670,7 +1693,7 @@ class SparkSubmitTests(unittest.TestCase): "local-cluster[1,1,512]", script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) - self.assertIn("[2, 3, 4]", out) + self.assertIn("[2, 3, 4]", out.decode('utf-8')) def test_single_script_on_cluster(self): """Submit and test a single script on a cluster""" @@ -1681,7 +1704,7 @@ class SparkSubmitTests(unittest.TestCase): | return x * 2 | |sc = SparkContext() - |print sc.parallelize([1, 2, 3]).map(foo).collect() + |print(sc.parallelize([1, 2, 3]).map(foo).collect()) """) # this will fail if you have different spark.executor.memory # in conf/spark-defaults.conf @@ -1690,7 +1713,7 @@ class SparkSubmitTests(unittest.TestCase): stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) - self.assertIn("[2, 4, 6]", out) + self.assertIn("[2, 4, 6]", out.decode('utf-8')) class ContextTests(unittest.TestCase): @@ -1765,7 +1788,7 @@ class SciPyTests(PySparkTestCase): def test_serialize(self): from scipy.special import gammaln x = range(1, 5) - expected = map(gammaln, x) + expected = list(map(gammaln, x)) observed = self.sc.parallelize(x).map(gammaln).collect() self.assertEqual(expected, observed) @@ -1786,11 +1809,11 @@ class NumPyTests(PySparkTestCase): if __name__ == "__main__": if not _have_scipy: - print "NOTE: Skipping SciPy tests as it does not seem to be installed" + print("NOTE: Skipping SciPy tests as it does not seem to be installed") if not _have_numpy: - print "NOTE: Skipping NumPy tests as it does not seem to be installed" + print("NOTE: Skipping NumPy tests as it does not seem to be installed") unittest.main() if not _have_scipy: - print "NOTE: SciPy tests were skipped as it does not seem to be installed" + print("NOTE: SciPy tests were skipped as it does not seem to be installed") if not _have_numpy: - print "NOTE: NumPy tests were skipped as it does not seem to be installed" + print("NOTE: NumPy tests were skipped as it does not seem to be installed") diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 452d6fabdc..fbdaf3a581 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -18,6 +18,7 @@ """ Worker that receives input from Piped RDD. """ +from __future__ import print_function import os import sys import time @@ -37,9 +38,9 @@ utf8_deserializer = UTF8Deserializer() def report_times(outfile, boot, init, finish): write_int(SpecialLengths.TIMING_DATA, outfile) - write_long(1000 * boot, outfile) - write_long(1000 * init, outfile) - write_long(1000 * finish, outfile) + write_long(int(1000 * boot), outfile) + write_long(int(1000 * init), outfile) + write_long(int(1000 * finish), outfile) def add_path(path): @@ -72,6 +73,9 @@ def main(infile, outfile): for _ in range(num_python_includes): filename = utf8_deserializer.loads(infile) add_path(os.path.join(spark_files_dir, filename)) + if sys.version > '3': + import importlib + importlib.invalidate_caches() # fetch names and values of broadcast variables num_broadcast_variables = read_int(infile) @@ -106,14 +110,14 @@ def main(infile, outfile): except Exception: try: write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) - write_with_length(traceback.format_exc(), outfile) + write_with_length(traceback.format_exc().encode("utf-8"), outfile) except IOError: # JVM close the socket pass except Exception: # Write the error to stderr if it happened while serializing - print >> sys.stderr, "PySpark worker failed with exception:" - print >> sys.stderr, traceback.format_exc() + print("PySpark worker failed with exception:", file=sys.stderr) + print(traceback.format_exc(), file=sys.stderr) exit(-1) finish_time = time.time() report_times(outfile, boot_time, init_time, finish_time) -- cgit v1.2.3