aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-05-07 09:48:31 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-07 09:48:31 -0700
commit3308722ca03f2bfa792e9a2cff9c894b967983d9 (patch)
tree78739ecc5b732316db6634cf917fd6e5de0225bc /python/pyspark/daemon.py
parent967635a2425a769b932eea0984fe697d6721cab0 (diff)
downloadspark-3308722ca03f2bfa792e9a2cff9c894b967983d9.tar.gz
spark-3308722ca03f2bfa792e9a2cff9c894b967983d9.tar.bz2
spark-3308722ca03f2bfa792e9a2cff9c894b967983d9.zip
SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance: - The Python daemon waits for Spark to close the socket before exiting, in order to avoid causing spurious IOExceptions in Spark's `PythonRDD::WriterThread`. - Removes the Python Monitor Thread, which polled for task cancellations in order to kill the Python worker. Instead, we do this in the onCompleteCallback, since this is guaranteed to be called during cancellation. - Adds a "completed" variable to TaskContext to avoid the issue noted in [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent. Along with this, I removed the "context.interrupted = true" flag in the onCompleteCallback. - Extracts PythonRDD::WriterThread to its own class. Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with ``` sc.textFile("latlon.tsv").take(5) ``` many times without error. Additionally, in order to test the unswallowed exceptions, I performed ``` sc.textFile("s3n://<big file>").count() ``` and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries. Author: Aaron Davidson <aaron@databricks.com> Closes #640 from aarondav/pyspark-io and squashes the following commits: b391ff8 [Aaron Davidson] Detect "clean socket shutdowns" and stop waiting on the socket c0c49da [Aaron Davidson] SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py14
1 files changed, 13 insertions, 1 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index eb18ec08c9..b2f226a55e 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -74,6 +74,17 @@ def worker(listen_sock):
raise
signal.signal(SIGCHLD, handle_sigchld)
+ # Blocks until the socket is closed by draining the input stream
+ # until it raises an exception or returns EOF.
+ def waitSocketClose(sock):
+ try:
+ while True:
+ # Empty string is returned upon EOF (and only then).
+ if sock.recv(4096) == '':
+ return
+ except:
+ pass
+
# Handle clients
while not should_exit():
# Wait until a client arrives or we have to exit
@@ -105,7 +116,8 @@ def worker(listen_sock):
exit_code = exc.code
finally:
outfile.flush()
- sock.close()
+ # The Scala side will close the socket upon task completion.
+ waitSocketClose(sock)
os._exit(compute_real_exit_code(exit_code))
else:
sock.close()