diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-31 21:58:26 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-31 21:58:26 -0800 |
commit | 7e2e046e37df6f71969a85f9995cb830be492050 (patch) | |
tree | ff9fcba8cd041cb0ec95b97a73d97fd6ee113490 /python/pyspark/worker.py | |
parent | 95e14fbc38450a6d0b527d79e46ff08d69810cf3 (diff) | |
parent | 39ab83e9577a5449fb0d6ef944dffc0d7cd00b4a (diff) | |
download | spark-7e2e046e37df6f71969a85f9995cb830be492050.tar.gz spark-7e2e046e37df6f71969a85f9995cb830be492050.tar.bz2 spark-7e2e046e37df6f71969a85f9995cb830be492050.zip |
Merge pull request #434 from pwendell/python-exceptions
SPARK-673: Capture and re-throw Python exceptions
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d33d6dd15f..9622e0cfe4 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -2,6 +2,7 @@ Worker that receives input from Piped RDD. """ import sys +import traceback from base64 import standard_b64decode # CloudPickler needs to be imported so that depicklers are registered using the # copy_reg module. @@ -40,8 +41,13 @@ def main(): else: dumps = dump_pickle iterator = read_from_pickle_file(sys.stdin) - for obj in func(split_index, iterator): - write_with_length(dumps(obj), old_stdout) + try: + for obj in func(split_index, iterator): + write_with_length(dumps(obj), old_stdout) + except Exception as e: + write_int(-2, old_stdout) + write_with_length(traceback.format_exc(), old_stdout) + sys.exit(-1) # Mark the beginning of the accumulators section of the output write_int(-1, old_stdout) for aid, accum in _accumulatorRegistry.items(): |