From 3446d5c8d6b385106ac85e46320d92faa8efb4e6 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 31 Jan 2013 18:02:28 -0800 Subject: SPARK-673: Capture and re-throw Python exceptions This patch alters the Python <-> executor protocol to pass on exception data when they occur in user Python code. --- python/pyspark/worker.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'python/pyspark/worker.py') diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d33d6dd15f..9622e0cfe4 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -2,6 +2,7 @@ Worker that receives input from Piped RDD. """ import sys +import traceback from base64 import standard_b64decode # CloudPickler needs to be imported so that depicklers are registered using the # copy_reg module. @@ -40,8 +41,13 @@ def main(): else: dumps = dump_pickle iterator = read_from_pickle_file(sys.stdin) - for obj in func(split_index, iterator): - write_with_length(dumps(obj), old_stdout) + try: + for obj in func(split_index, iterator): + write_with_length(dumps(obj), old_stdout) + except Exception as e: + write_int(-2, old_stdout) + write_with_length(traceback.format_exc(), old_stdout) + sys.exit(-1) # Mark the beginning of the accumulators section of the output write_int(-1, old_stdout) for aid, accum in _accumulatorRegistry.items(): -- cgit v1.2.3