aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-05-18 12:55:13 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-18 12:55:13 -0700
commit32fbd297dd651ba3ce4ce52aeb0488233149cdf9 (patch)
treec18970db228448c856f8ce9538d708511e9e95fd /python/pyspark/worker.py
parent9dadf019b93038e1e18336ccd06c5eecb4bae32f (diff)
downloadspark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.gz
spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.bz2
spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.zip
[SPARK-6216] [PYSPARK] check python version of worker with driver
This PR revert #5404, change to pass the version of python in driver into JVM, check it in worker before deserializing closure, then it can works with different major version of Python. Author: Davies Liu <davies@databricks.com> Closes #6203 from davies/py_version and squashes the following commits: b8fb76e [Davies Liu] fix test 6ce5096 [Davies Liu] use string for version 47c6278 [Davies Liu] check python version of worker with driver
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index fbdaf3a581..93df9002be 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -57,6 +57,12 @@ def main(infile, outfile):
if split_index == -1: # for unit tests
exit(-1)
+ version = utf8_deserializer.loads(infile)
+ if version != "%d.%d" % sys.version_info[:2]:
+ raise Exception(("Python in worker has different version %s than that in " +
+ "driver %s, PySpark cannot run with different minor versions") %
+ ("%d.%d" % sys.version_info[:2], version))
+
# initialize global state
shuffle.MemoryBytesSpilled = 0
shuffle.DiskBytesSpilled = 0
@@ -92,11 +98,7 @@ def main(infile, outfile):
command = pickleSer._read_with_length(infile)
if isinstance(command, Broadcast):
command = pickleSer.loads(command.value)
- (func, profiler, deserializer, serializer), version = command
- if version != sys.version_info[:2]:
- raise Exception(("Python in worker has different version %s than that in " +
- "driver %s, PySpark cannot run with different minor versions") %
- (sys.version_info[:2], version))
+ func, profiler, deserializer, serializer = command
init_time = time.time()
def process():