aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorBouke van der Bijl <boukevanderbijl@gmail.com>2014-05-10 13:02:13 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-10 13:02:13 -0700
commit3776f2f283842543ff766398292532c6e94221cc (patch)
treec42e92390922359f8b3fec88ad5b371014900e40 /python
parentc05d11bb307eaba40c5669da2d374c28debaa55a (diff)
downloadspark-3776f2f283842543ff766398292532c6e94221cc.tar.gz
spark-3776f2f283842543ff766398292532c6e94221cc.tar.bz2
spark-3776f2f283842543ff766398292532c6e94221cc.zip
Add Python includes to path before depickling broadcast values
This fixes https://issues.apache.org/jira/browse/SPARK-1731 by adding the Python includes to the PYTHONPATH before depickling the broadcast values @airhorns Author: Bouke van der Bijl <boukevanderbijl@gmail.com> Closes #656 from bouk/python-includes-before-broadcast and squashes the following commits: 7b0dfe4 [Bouke van der Bijl] Add Python includes to path before depickling broadcast values
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/worker.py14
1 files changed, 7 insertions, 7 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 4c214ef359..f43210c6c0 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -56,13 +56,6 @@ def main(infile, outfile):
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True
- # fetch names and values of broadcast variables
- num_broadcast_variables = read_int(infile)
- for _ in range(num_broadcast_variables):
- bid = read_long(infile)
- value = pickleSer._read_with_length(infile)
- _broadcastRegistry[bid] = Broadcast(bid, value)
-
# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile)
@@ -70,6 +63,13 @@ def main(infile, outfile):
filename = utf8_deserializer.loads(infile)
sys.path.append(os.path.join(spark_files_dir, filename))
+ # fetch names and values of broadcast variables
+ num_broadcast_variables = read_int(infile)
+ for _ in range(num_broadcast_variables):
+ bid = read_long(infile)
+ value = pickleSer._read_with_length(infile)
+ _broadcastRegistry[bid] = Broadcast(bid, value)
+
command = pickleSer._read_with_length(infile)
(func, deserializer, serializer) = command
init_time = time.time()