aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorBouke van der Bijl <boukevanderbijl@gmail.com>2014-02-26 14:50:37 -0800
committerJosh Rosen <joshrosen@apache.org>2014-02-26 14:51:21 -0800
commit12738c1aec136acd7f2e3e2f8f2b541db0890630 (patch)
treed530344f2ddbf1bdb84e96beec557cfeb1c52558 /python/pyspark/worker.py
parentc86eec584312072e73754a4f1cbe34d2e1968c77 (diff)
downloadspark-12738c1aec136acd7f2e3e2f8f2b541db0890630.tar.gz
spark-12738c1aec136acd7f2e3e2f8f2b541db0890630.tar.bz2
spark-12738c1aec136acd7f2e3e2f8f2b541db0890630.zip
SPARK-1115: Catch depickling errors
This surroungs the complete worker code in a try/except block so we catch any error that arrives. An example would be the depickling failing for some reason @JoshRosen Author: Bouke van der Bijl <boukevanderbijl@gmail.com> Closes #644 from bouk/catch-depickling-errors and squashes the following commits: f0f67cc [Bouke van der Bijl] Lol indentation 0e4d504 [Bouke van der Bijl] Surround the complete python worker with the try block
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py48
1 files changed, 24 insertions, 24 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 1586463520..4c214ef359 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -45,34 +45,34 @@ def report_times(outfile, boot, init, finish):
def main(infile, outfile):
- boot_time = time.time()
- split_index = read_int(infile)
- if split_index == -1: # for unit tests
- return
+ try:
+ boot_time = time.time()
+ split_index = read_int(infile)
+ if split_index == -1: # for unit tests
+ return
- # fetch name of workdir
- spark_files_dir = utf8_deserializer.loads(infile)
- SparkFiles._root_directory = spark_files_dir
- SparkFiles._is_running_on_worker = True
+ # fetch name of workdir
+ spark_files_dir = utf8_deserializer.loads(infile)
+ SparkFiles._root_directory = spark_files_dir
+ SparkFiles._is_running_on_worker = True
- # fetch names and values of broadcast variables
- num_broadcast_variables = read_int(infile)
- for _ in range(num_broadcast_variables):
- bid = read_long(infile)
- value = pickleSer._read_with_length(infile)
- _broadcastRegistry[bid] = Broadcast(bid, value)
+ # fetch names and values of broadcast variables
+ num_broadcast_variables = read_int(infile)
+ for _ in range(num_broadcast_variables):
+ bid = read_long(infile)
+ value = pickleSer._read_with_length(infile)
+ _broadcastRegistry[bid] = Broadcast(bid, value)
- # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
- sys.path.append(spark_files_dir) # *.py files that were added will be copied here
- num_python_includes = read_int(infile)
- for _ in range(num_python_includes):
- filename = utf8_deserializer.loads(infile)
- sys.path.append(os.path.join(spark_files_dir, filename))
+ # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
+ sys.path.append(spark_files_dir) # *.py files that were added will be copied here
+ num_python_includes = read_int(infile)
+ for _ in range(num_python_includes):
+ filename = utf8_deserializer.loads(infile)
+ sys.path.append(os.path.join(spark_files_dir, filename))
- command = pickleSer._read_with_length(infile)
- (func, deserializer, serializer) = command
- init_time = time.time()
- try:
+ command = pickleSer._read_with_length(infile)
+ (func, deserializer, serializer) = command
+ init_time = time.time()
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
except Exception as e: