diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-10-23 17:20:00 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-10-23 17:20:00 -0700 |
commit | e595c8d08a20a122295af62d5e9cc4116f9727f6 (patch) | |
tree | ec0226aecad30372b9ece27e534f4482c24c94bf /python/pyspark/daemon.py | |
parent | 83b7a1c6503adce1826fc537b4db47e534da5cae (diff) | |
download | spark-e595c8d08a20a122295af62d5e9cc4116f9727f6.tar.gz spark-e595c8d08a20a122295af62d5e9cc4116f9727f6.tar.bz2 spark-e595c8d08a20a122295af62d5e9cc4116f9727f6.zip |
[SPARK-3993] [PySpark] fix bug while reuse worker after take()
After take(), maybe there are some garbage left in the socket, then next task assigned to this worker will hang because of corrupted data.
We should make sure the socket is clean before reuse it, write END_OF_STREAM at the end, and check it after read out all result from python.
Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes #2838 from davies/fix_reuse and squashes the following commits:
8872914 [Davies Liu] fix tests
660875b [Davies Liu] fix bug while reuse worker after take()
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r-- | python/pyspark/daemon.py | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 64d6202acb..dbb34775d9 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -26,7 +26,7 @@ import time import gc from errno import EINTR, ECHILD, EAGAIN from socket import AF_INET, SOCK_STREAM, SOMAXCONN -from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN +from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT from pyspark.worker import main as worker_main from pyspark.serializers import read_int, write_int @@ -46,6 +46,9 @@ def worker(sock): signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) signal.signal(SIGTERM, SIG_DFL) + # restore the handler for SIGINT, + # it's useful for debugging (show the stacktrace before exit) + signal.signal(SIGINT, signal.default_int_handler) # Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because |