diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-31 18:02:28 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-31 18:06:11 -0800 |
commit | 3446d5c8d6b385106ac85e46320d92faa8efb4e6 (patch) | |
tree | 220b114399de112adbebb774ac4bd456deb87040 /python | |
parent | 55327a283e962652a126d3f8ac7e9a19c76f1f19 (diff) | |
download | spark-3446d5c8d6b385106ac85e46320d92faa8efb4e6.tar.gz spark-3446d5c8d6b385106ac85e46320d92faa8efb4e6.tar.bz2 spark-3446d5c8d6b385106ac85e46320d92faa8efb4e6.zip |
SPARK-673: Capture and re-throw Python exceptions
This patch alters the Python <-> executor protocol to pass on
exception data when they occur in user Python code.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/worker.py | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d33d6dd15f..9622e0cfe4 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -2,6 +2,7 @@ Worker that receives input from Piped RDD. """ import sys +import traceback from base64 import standard_b64decode # CloudPickler needs to be imported so that depicklers are registered using the # copy_reg module. @@ -40,8 +41,13 @@ def main(): else: dumps = dump_pickle iterator = read_from_pickle_file(sys.stdin) - for obj in func(split_index, iterator): - write_with_length(dumps(obj), old_stdout) + try: + for obj in func(split_index, iterator): + write_with_length(dumps(obj), old_stdout) + except Exception as e: + write_int(-2, old_stdout) + write_with_length(traceback.format_exc(), old_stdout) + sys.exit(-1) # Mark the beginning of the accumulators section of the output write_int(-1, old_stdout) for aid, accum in _accumulatorRegistry.items(): |