aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorAndre Schumacher <schumach@icsi.berkeley.edu>2013-08-15 16:01:19 -0700
committerAndre Schumacher <schumach@icsi.berkeley.edu>2013-08-16 11:58:20 -0700
commitc7e348faec45ad1d996d16639015c4bc4fc3bc92 (patch)
tree45e69b999c4b4af6bd7528e3dcc860bce264e14f /python/pyspark/worker.py
parent659553b21ddd7504889ce113a816c1db4a73f167 (diff)
downloadspark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.tar.gz
spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.tar.bz2
spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.zip
Implementing SPARK-878 for PySpark: adding zip and egg files to context and passing it down to workers which add these to their sys.path
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py13
1 files changed, 12 insertions, 1 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 75d692beeb..695f6dfb84 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -49,15 +49,26 @@ def main(infile, outfile):
split_index = read_int(infile)
if split_index == -1: # for unit tests
return
+
+ # fetch name of workdir
spark_files_dir = load_pickle(read_with_length(infile))
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True
- sys.path.append(spark_files_dir)
+
+ # fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
for _ in range(num_broadcast_variables):
bid = read_long(infile)
value = read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, load_pickle(value))
+
+ # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
+ sys.path.append(spark_files_dir) # *.py files that were added will be copied here
+ num_python_includes = read_int(infile)
+ for _ in range(num_python_includes):
+ sys.path.append(os.path.join(spark_files_dir, load_pickle(read_with_length(infile))))
+
+ # now load function
func = load_obj(infile)
bypassSerializer = load_obj(infile)
if bypassSerializer: