aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-06-21 12:13:48 -0400
committerJey Kottalam <jey@cs.berkeley.edu>2013-06-21 12:14:16 -0400
commitc75bed0eebb1f937db02eb98deecd380724f747d (patch)
tree1ebddbc7707bb2c56869934f9e6adb34ff1da70b /python/pyspark
parent1ba3c173034c37ef99fc312c84943d2ab8885670 (diff)
downloadspark-c75bed0eebb1f937db02eb98deecd380724f747d.tar.gz
spark-c75bed0eebb1f937db02eb98deecd380724f747d.tar.bz2
spark-c75bed0eebb1f937db02eb98deecd380724f747d.zip
Fix reporting of PySpark exceptions
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/daemon.py22
-rw-r--r--python/pyspark/worker.py2
2 files changed, 19 insertions, 5 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 2b5e9b3581..78a2da1e18 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -21,6 +21,15 @@ def should_exit():
return exit_flag.value
+def compute_real_exit_code(exit_code):
+ # SystemExit's code can be integer or string, but os._exit only accepts integers
+ import numbers
+ if isinstance(exit_code, numbers.Integral):
+ return exit_code
+ else:
+ return 1
+
+
def worker(listen_sock):
# Redirect stdout to stderr
os.dup2(2, 1)
@@ -65,10 +74,15 @@ def worker(listen_sock):
listen_sock.close()
# Handle the client then exit
sockfile = sock.makefile()
- worker_main(sockfile, sockfile)
- sockfile.close()
- sock.close()
- os._exit(0)
+ exit_code = 0
+ try:
+ worker_main(sockfile, sockfile)
+ except SystemExit as exc:
+ exit_code = exc.code
+ finally:
+ sockfile.close()
+ sock.close()
+ os._exit(compute_real_exit_code(exit_code))
else:
sock.close()
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index f76ee3c236..379bbfd4c2 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -55,7 +55,7 @@ def main(infile, outfile):
except Exception as e:
write_int(-2, outfile)
write_with_length(traceback.format_exc(), outfile)
- raise
+ sys.exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
# Mark the beginning of the accumulators section of the output