From 32fbd297dd651ba3ce4ce52aeb0488233149cdf9 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 18 May 2015 12:55:13 -0700 Subject: [SPARK-6216] [PYSPARK] check python version of worker with driver This PR revert #5404, change to pass the version of python in driver into JVM, check it in worker before deserializing closure, then it can works with different major version of Python. Author: Davies Liu Closes #6203 from davies/py_version and squashes the following commits: b8fb76e [Davies Liu] fix test 6ce5096 [Davies Liu] use string for version 47c6278 [Davies Liu] check python version of worker with driver --- python/pyspark/worker.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'python/pyspark/worker.py') diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index fbdaf3a581..93df9002be 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -57,6 +57,12 @@ def main(infile, outfile): if split_index == -1: # for unit tests exit(-1) + version = utf8_deserializer.loads(infile) + if version != "%d.%d" % sys.version_info[:2]: + raise Exception(("Python in worker has different version %s than that in " + + "driver %s, PySpark cannot run with different minor versions") % + ("%d.%d" % sys.version_info[:2], version)) + # initialize global state shuffle.MemoryBytesSpilled = 0 shuffle.DiskBytesSpilled = 0 @@ -92,11 +98,7 @@ def main(infile, outfile): command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) - (func, profiler, deserializer, serializer), version = command - if version != sys.version_info[:2]: - raise Exception(("Python in worker has different version %s than that in " + - "driver %s, PySpark cannot run with different minor versions") % - (sys.version_info[:2], version)) + func, profiler, deserializer, serializer = command init_time = time.time() def process(): -- cgit v1.2.3