aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py21
1 files changed, 13 insertions, 8 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 24d41b12d1..2770f63059 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -75,14 +75,19 @@ def main(infile, outfile):
init_time = time.time()
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
- except Exception as e:
- # Write the error to stderr in addition to trying to pass it back to
- # Java, in case it happened while serializing a record
- print >> sys.stderr, "PySpark worker failed with exception:"
- print >> sys.stderr, traceback.format_exc()
- write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
- write_with_length(traceback.format_exc(), outfile)
- sys.exit(-1)
+ except Exception:
+ try:
+ write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
+ write_with_length(traceback.format_exc(), outfile)
+ outfile.flush()
+ except IOError:
+ # JVM close the socket
+ pass
+ except Exception:
+ # Write the error to stderr if it happened while serializing
+ print >> sys.stderr, "PySpark worker failed with exception:"
+ print >> sys.stderr, traceback.format_exc()
+ exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
# Mark the beginning of the accumulators section of the output