aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-05-18 12:55:13 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-18 12:55:13 -0700
commit32fbd297dd651ba3ce4ce52aeb0488233149cdf9 (patch)
treec18970db228448c856f8ce9538d708511e9e95fd /python/pyspark/rdd.py
parent9dadf019b93038e1e18336ccd06c5eecb4bae32f (diff)
downloadspark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.gz
spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.tar.bz2
spark-32fbd297dd651ba3ce4ce52aeb0488233149cdf9.zip
[SPARK-6216] [PYSPARK] check python version of worker with driver
This PR revert #5404, change to pass the version of python in driver into JVM, check it in worker before deserializing closure, then it can works with different major version of Python. Author: Davies Liu <davies@databricks.com> Closes #6203 from davies/py_version and squashes the following commits: b8fb76e [Davies Liu] fix test 6ce5096 [Davies Liu] use string for version 47c6278 [Davies Liu] check python version of worker with driver
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py4
1 files changed, 2 insertions, 2 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 545c5ad20c..70db4bbe4c 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2260,7 +2260,7 @@ class RDD(object):
def _prepare_for_python_RDD(sc, command, obj=None):
# the serialized command will be compressed by broadcast
ser = CloudPickleSerializer()
- pickled_command = ser.dumps((command, sys.version_info[:2]))
+ pickled_command = ser.dumps(command)
if len(pickled_command) > (1 << 20): # 1M
# The broadcast will have same life cycle as created PythonRDD
broadcast = sc.broadcast(pickled_command)
@@ -2344,7 +2344,7 @@ class PipelinedRDD(RDD):
python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
bytearray(pickled_cmd),
env, includes, self.preservesPartitioning,
- self.ctx.pythonExec,
+ self.ctx.pythonExec, self.ctx.pythonVer,
bvars, self.ctx._javaAccumulator)
self._jrdd_val = python_rdd.asJavaRDD()