diff options
author | Davies Liu <davies@databricks.com> | 2015-05-18 12:55:13 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-05-18 12:55:13 -0700 |
commit | 32fbd297dd651ba3ce4ce52aeb0488233149cdf9 (patch) | |
tree | c18970db228448c856f8ce9538d708511e9e95fd /python/pyspark/worker.py | |
parent | 9dadf019b93038e1e18336ccd06c5eecb4bae32f (diff) | |
download | spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.gz spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.bz2 spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.zip |
[SPARK-6216] [PYSPARK] check python version of worker with driver
This PR revert #5404, change to pass the version of python in driver into JVM, check it in worker before deserializing closure, then it can works with different major version of Python.
Author: Davies Liu <davies@databricks.com>
Closes #6203 from davies/py_version and squashes the following commits:
b8fb76e [Davies Liu] fix test
6ce5096 [Davies Liu] use string for version
47c6278 [Davies Liu] check python version of worker with driver
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index fbdaf3a581..93df9002be 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -57,6 +57,12 @@ def main(infile, outfile): if split_index == -1: # for unit tests exit(-1) + version = utf8_deserializer.loads(infile) + if version != "%d.%d" % sys.version_info[:2]: + raise Exception(("Python in worker has different version %s than that in " + + "driver %s, PySpark cannot run with different minor versions") % + ("%d.%d" % sys.version_info[:2], version)) + # initialize global state shuffle.MemoryBytesSpilled = 0 shuffle.DiskBytesSpilled = 0 @@ -92,11 +98,7 @@ def main(infile, outfile): command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) - (func, profiler, deserializer, serializer), version = command - if version != sys.version_info[:2]: - raise Exception(("Python in worker has different version %s than that in " + - "driver %s, PySpark cannot run with different minor versions") % - (sys.version_info[:2], version)) + func, profiler, deserializer, serializer = command init_time = time.time() def process(): |