diff options
author | Bouke van der Bijl <boukevanderbijl@gmail.com> | 2014-02-26 14:50:37 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-02-26 14:51:21 -0800 |
commit | 12738c1aec136acd7f2e3e2f8f2b541db0890630 (patch) | |
tree | d530344f2ddbf1bdb84e96beec557cfeb1c52558 | |
parent | c86eec584312072e73754a4f1cbe34d2e1968c77 (diff) | |
download | spark-12738c1aec136acd7f2e3e2f8f2b541db0890630.tar.gz spark-12738c1aec136acd7f2e3e2f8f2b541db0890630.tar.bz2 spark-12738c1aec136acd7f2e3e2f8f2b541db0890630.zip |
SPARK-1115: Catch depickling errors
This surroungs the complete worker code in a try/except block so we catch any error that arrives. An example would be the depickling failing for some reason
@JoshRosen
Author: Bouke van der Bijl <boukevanderbijl@gmail.com>
Closes #644 from bouk/catch-depickling-errors and squashes the following commits:
f0f67cc [Bouke van der Bijl] Lol indentation
0e4d504 [Bouke van der Bijl] Surround the complete python worker with the try block
-rw-r--r-- | python/pyspark/worker.py | 48 |
1 files changed, 24 insertions, 24 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 1586463520..4c214ef359 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -45,34 +45,34 @@ def report_times(outfile, boot, init, finish): def main(infile, outfile): - boot_time = time.time() - split_index = read_int(infile) - if split_index == -1: # for unit tests - return + try: + boot_time = time.time() + split_index = read_int(infile) + if split_index == -1: # for unit tests + return - # fetch name of workdir - spark_files_dir = utf8_deserializer.loads(infile) - SparkFiles._root_directory = spark_files_dir - SparkFiles._is_running_on_worker = True + # fetch name of workdir + spark_files_dir = utf8_deserializer.loads(infile) + SparkFiles._root_directory = spark_files_dir + SparkFiles._is_running_on_worker = True - # fetch names and values of broadcast variables - num_broadcast_variables = read_int(infile) - for _ in range(num_broadcast_variables): - bid = read_long(infile) - value = pickleSer._read_with_length(infile) - _broadcastRegistry[bid] = Broadcast(bid, value) + # fetch names and values of broadcast variables + num_broadcast_variables = read_int(infile) + for _ in range(num_broadcast_variables): + bid = read_long(infile) + value = pickleSer._read_with_length(infile) + _broadcastRegistry[bid] = Broadcast(bid, value) - # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH - sys.path.append(spark_files_dir) # *.py files that were added will be copied here - num_python_includes = read_int(infile) - for _ in range(num_python_includes): - filename = utf8_deserializer.loads(infile) - sys.path.append(os.path.join(spark_files_dir, filename)) + # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH + sys.path.append(spark_files_dir) # *.py files that were added will be copied here + num_python_includes = read_int(infile) + for _ in range(num_python_includes): + filename = utf8_deserializer.loads(infile) + sys.path.append(os.path.join(spark_files_dir, filename)) - command = pickleSer._read_with_length(infile) - (func, deserializer, serializer) = command - init_time = time.time() - try: + command = pickleSer._read_with_length(infile) + (func, deserializer, serializer) = command + init_time = time.time() iterator = deserializer.load_stream(infile) serializer.dump_stream(func(split_index, iterator), outfile) except Exception as e: |