aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-07-29 00:15:45 -0700
committerJosh Rosen <joshrosen@apache.org>2014-07-29 00:15:45 -0700
commitccd5ab5f82812abc2eb518448832cc20fb903345 (patch)
tree48c5e41187c0aa88bc65cfad1f837beedadd7fd4 /python/pyspark/worker.py
parent16ef4d110f15dfe66852802fdadfe2ed7574ddc2 (diff)
downloadspark-ccd5ab5f82812abc2eb518448832cc20fb903345.tar.gz
spark-ccd5ab5f82812abc2eb518448832cc20fb903345.tar.bz2
spark-ccd5ab5f82812abc2eb518448832cc20fb903345.zip
[SPARK-2580] [PySpark] keep silent in worker if JVM close the socket
During rdd.take(n), JVM will close the socket if it had got enough data, the Python worker should keep silent in this case. In the same time, the worker should not print the trackback into stderr if it send the traceback to JVM successfully. Author: Davies Liu <davies.liu@gmail.com> Closes #1625 from davies/error and squashes the following commits: 4fbcc6d [Davies Liu] disable log4j during testing when exception is expected. cc14202 [Davies Liu] keep silent in worker if JVM close the socket
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py21
1 files changed, 13 insertions, 8 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 24d41b12d1..2770f63059 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -75,14 +75,19 @@ def main(infile, outfile):
init_time = time.time()
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
- except Exception as e:
- # Write the error to stderr in addition to trying to pass it back to
- # Java, in case it happened while serializing a record
- print >> sys.stderr, "PySpark worker failed with exception:"
- print >> sys.stderr, traceback.format_exc()
- write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
- write_with_length(traceback.format_exc(), outfile)
- sys.exit(-1)
+ except Exception:
+ try:
+ write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
+ write_with_length(traceback.format_exc(), outfile)
+ outfile.flush()
+ except IOError:
+ # JVM close the socket
+ pass
+ except Exception:
+ # Write the error to stderr if it happened while serializing
+ print >> sys.stderr, "PySpark worker failed with exception:"
+ print >> sys.stderr, traceback.format_exc()
+ exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
# Mark the beginning of the accumulators section of the output