aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-09-01 14:57:27 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-09-01 14:57:27 -0700
commit2ce200bf7f7a38afbcacf3303ca2418e49bdbe2a (patch)
tree586a62e61ad15b5eda60cb13e15ca0c66cb1cc31 /python/pyspark/worker.py
parent87d586e4da63e6e1875d9cac194c6f11e1cdc653 (diff)
parentf957c26fa27486c329d82cb66595b2cf07aed0ef (diff)
downloadspark-2ce200bf7f7a38afbcacf3303ca2418e49bdbe2a.tar.gz
spark-2ce200bf7f7a38afbcacf3303ca2418e49bdbe2a.tar.bz2
spark-2ce200bf7f7a38afbcacf3303ca2418e49bdbe2a.zip
Merge remote-tracking branch 'old/master'
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py13
1 files changed, 12 insertions, 1 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 75d692beeb..695f6dfb84 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -49,15 +49,26 @@ def main(infile, outfile):
split_index = read_int(infile)
if split_index == -1: # for unit tests
return
+
+ # fetch name of workdir
spark_files_dir = load_pickle(read_with_length(infile))
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True
- sys.path.append(spark_files_dir)
+
+ # fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
for _ in range(num_broadcast_variables):
bid = read_long(infile)
value = read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, load_pickle(value))
+
+ # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
+ sys.path.append(spark_files_dir) # *.py files that were added will be copied here
+ num_python_includes = read_int(infile)
+ for _ in range(num_python_includes):
+ sys.path.append(os.path.join(spark_files_dir, load_pickle(read_with_length(infile))))
+
+ # now load function
func = load_obj(infile)
bypassSerializer = load_obj(infile)
if bypassSerializer: