aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-31 21:58:26 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-31 21:58:26 -0800
commit7e2e046e37df6f71969a85f9995cb830be492050 (patch)
treeff9fcba8cd041cb0ec95b97a73d97fd6ee113490 /python/pyspark/worker.py
parent95e14fbc38450a6d0b527d79e46ff08d69810cf3 (diff)
parent39ab83e9577a5449fb0d6ef944dffc0d7cd00b4a (diff)
downloadspark-7e2e046e37df6f71969a85f9995cb830be492050.tar.gz
spark-7e2e046e37df6f71969a85f9995cb830be492050.tar.bz2
spark-7e2e046e37df6f71969a85f9995cb830be492050.zip
Merge pull request #434 from pwendell/python-exceptions
SPARK-673: Capture and re-throw Python exceptions
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py10
1 files changed, 8 insertions, 2 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index d33d6dd15f..9622e0cfe4 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -2,6 +2,7 @@
Worker that receives input from Piped RDD.
"""
import sys
+import traceback
from base64 import standard_b64decode
# CloudPickler needs to be imported so that depicklers are registered using the
# copy_reg module.
@@ -40,8 +41,13 @@ def main():
else:
dumps = dump_pickle
iterator = read_from_pickle_file(sys.stdin)
- for obj in func(split_index, iterator):
- write_with_length(dumps(obj), old_stdout)
+ try:
+ for obj in func(split_index, iterator):
+ write_with_length(dumps(obj), old_stdout)
+ except Exception as e:
+ write_int(-2, old_stdout)
+ write_with_length(traceback.format_exc(), old_stdout)
+ sys.exit(-1)
# Mark the beginning of the accumulators section of the output
write_int(-1, old_stdout)
for aid, accum in _accumulatorRegistry.items():