aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/worker.py48
1 files changed, 24 insertions, 24 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 1586463520..4c214ef359 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -45,34 +45,34 @@ def report_times(outfile, boot, init, finish):
def main(infile, outfile):
- boot_time = time.time()
- split_index = read_int(infile)
- if split_index == -1: # for unit tests
- return
+ try:
+ boot_time = time.time()
+ split_index = read_int(infile)
+ if split_index == -1: # for unit tests
+ return
- # fetch name of workdir
- spark_files_dir = utf8_deserializer.loads(infile)
- SparkFiles._root_directory = spark_files_dir
- SparkFiles._is_running_on_worker = True
+ # fetch name of workdir
+ spark_files_dir = utf8_deserializer.loads(infile)
+ SparkFiles._root_directory = spark_files_dir
+ SparkFiles._is_running_on_worker = True
- # fetch names and values of broadcast variables
- num_broadcast_variables = read_int(infile)
- for _ in range(num_broadcast_variables):
- bid = read_long(infile)
- value = pickleSer._read_with_length(infile)
- _broadcastRegistry[bid] = Broadcast(bid, value)
+ # fetch names and values of broadcast variables
+ num_broadcast_variables = read_int(infile)
+ for _ in range(num_broadcast_variables):
+ bid = read_long(infile)
+ value = pickleSer._read_with_length(infile)
+ _broadcastRegistry[bid] = Broadcast(bid, value)
- # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
- sys.path.append(spark_files_dir) # *.py files that were added will be copied here
- num_python_includes = read_int(infile)
- for _ in range(num_python_includes):
- filename = utf8_deserializer.loads(infile)
- sys.path.append(os.path.join(spark_files_dir, filename))
+ # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
+ sys.path.append(spark_files_dir) # *.py files that were added will be copied here
+ num_python_includes = read_int(infile)
+ for _ in range(num_python_includes):
+ filename = utf8_deserializer.loads(infile)
+ sys.path.append(os.path.join(spark_files_dir, filename))
- command = pickleSer._read_with_length(infile)
- (func, deserializer, serializer) = command
- init_time = time.time()
- try:
+ command = pickleSer._read_with_length(infile)
+ (func, deserializer, serializer) = command
+ init_time = time.time()
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
except Exception as e: