aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-10 13:00:38 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-10 13:00:38 -0700
commit28dcbb531ae57dc50f15ad9df6c31022731669c9 (patch)
tree307bfd292e7c0d141631191ae721e8eac96eabfd /python/pyspark/daemon.py
parent1d03a26a4895c24ebfab1a3cf6656af75cb53003 (diff)
downloadspark-28dcbb531ae57dc50f15ad9df6c31022731669c9.tar.gz
spark-28dcbb531ae57dc50f15ad9df6c31022731669c9.tar.bz2
spark-28dcbb531ae57dc50f15ad9df6c31022731669c9.zip
[SPARK-2898] [PySpark] fix bugs in deamon.py
1. do not use signal handler for SIGCHILD, it's easy to cause deadlock 2. handle EINTR during accept() 3. pass errno into JVM 4. handle EAGAIN during fork() Now, it can pass 50k tasks tests in 180 seconds. Author: Davies Liu <davies.liu@gmail.com> Closes #1842 from davies/qa and squashes the following commits: f0ea451 [Davies Liu] fix lint 03a2e8c [Davies Liu] cleanup dead children every seconds 32cb829 [Davies Liu] fix lint 0cd0817 [Davies Liu] fix bugs in deamon.py
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py78
1 files changed, 47 insertions, 31 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index e73538baf0..22ab8d30c0 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -22,7 +22,8 @@ import select
import socket
import sys
import traceback
-from errno import EINTR, ECHILD
+import time
+from errno import EINTR, ECHILD, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
from pyspark.worker import main as worker_main
@@ -80,6 +81,17 @@ def worker(sock):
os._exit(compute_real_exit_code(exit_code))
+# Cleanup zombie children
+def cleanup_dead_children():
+ try:
+ while True:
+ pid, _ = os.waitpid(0, os.WNOHANG)
+ if not pid:
+ break
+ except:
+ pass
+
+
def manager():
# Create a new process group to corral our children
os.setpgid(0, 0)
@@ -102,29 +114,21 @@ def manager():
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
- # Cleanup zombie children
- def handle_sigchld(*args):
- try:
- pid, status = os.waitpid(0, os.WNOHANG)
- if status != 0:
- msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
- print >> sys.stderr, msg
- except EnvironmentError as err:
- if err.errno not in (ECHILD, EINTR):
- raise
- signal.signal(SIGCHLD, handle_sigchld)
-
# Initialization complete
sys.stdout.close()
try:
while True:
try:
- ready_fds = select.select([0, listen_sock], [], [])[0]
+ ready_fds = select.select([0, listen_sock], [], [], 1)[0]
except select.error as ex:
if ex[0] == EINTR:
continue
else:
raise
+
+ # cleanup in signal handler will cause deadlock
+ cleanup_dead_children()
+
if 0 in ready_fds:
try:
worker_pid = read_int(sys.stdin)
@@ -137,29 +141,41 @@ def manager():
pass # process already died
if listen_sock in ready_fds:
- sock, addr = listen_sock.accept()
+ try:
+ sock, _ = listen_sock.accept()
+ except OSError as e:
+ if e.errno == EINTR:
+ continue
+ raise
+
# Launch a worker process
try:
pid = os.fork()
- if pid == 0:
- listen_sock.close()
- try:
- worker(sock)
- except:
- traceback.print_exc()
- os._exit(1)
- else:
- os._exit(0)
+ except OSError as e:
+ if e.errno in (EAGAIN, EINTR):
+ time.sleep(1)
+ pid = os.fork() # error here will shutdown daemon
else:
+ outfile = sock.makefile('w')
+ write_int(e.errno, outfile) # Signal that the fork failed
+ outfile.flush()
+ outfile.close()
sock.close()
-
- except OSError as e:
- print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
- outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
- write_int(-1, outfile) # Signal that the fork failed
- outfile.flush()
- outfile.close()
+ continue
+
+ if pid == 0:
+ # in child process
+ listen_sock.close()
+ try:
+ worker(sock)
+ except:
+ traceback.print_exc()
+ os._exit(1)
+ else:
+ os._exit(0)
+ else:
sock.close()
+
finally:
shutdown(1)