aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py78
1 files changed, 47 insertions, 31 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index e73538baf0..22ab8d30c0 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -22,7 +22,8 @@ import select
import socket
import sys
import traceback
-from errno import EINTR, ECHILD
+import time
+from errno import EINTR, ECHILD, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
from pyspark.worker import main as worker_main
@@ -80,6 +81,17 @@ def worker(sock):
os._exit(compute_real_exit_code(exit_code))
+# Cleanup zombie children
+def cleanup_dead_children():
+ try:
+ while True:
+ pid, _ = os.waitpid(0, os.WNOHANG)
+ if not pid:
+ break
+ except:
+ pass
+
+
def manager():
# Create a new process group to corral our children
os.setpgid(0, 0)
@@ -102,29 +114,21 @@ def manager():
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
- # Cleanup zombie children
- def handle_sigchld(*args):
- try:
- pid, status = os.waitpid(0, os.WNOHANG)
- if status != 0:
- msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
- print >> sys.stderr, msg
- except EnvironmentError as err:
- if err.errno not in (ECHILD, EINTR):
- raise
- signal.signal(SIGCHLD, handle_sigchld)
-
# Initialization complete
sys.stdout.close()
try:
while True:
try:
- ready_fds = select.select([0, listen_sock], [], [])[0]
+ ready_fds = select.select([0, listen_sock], [], [], 1)[0]
except select.error as ex:
if ex[0] == EINTR:
continue
else:
raise
+
+ # cleanup in signal handler will cause deadlock
+ cleanup_dead_children()
+
if 0 in ready_fds:
try:
worker_pid = read_int(sys.stdin)
@@ -137,29 +141,41 @@ def manager():
pass # process already died
if listen_sock in ready_fds:
- sock, addr = listen_sock.accept()
+ try:
+ sock, _ = listen_sock.accept()
+ except OSError as e:
+ if e.errno == EINTR:
+ continue
+ raise
+
# Launch a worker process
try:
pid = os.fork()
- if pid == 0:
- listen_sock.close()
- try:
- worker(sock)
- except:
- traceback.print_exc()
- os._exit(1)
- else:
- os._exit(0)
+ except OSError as e:
+ if e.errno in (EAGAIN, EINTR):
+ time.sleep(1)
+ pid = os.fork() # error here will shutdown daemon
else:
+ outfile = sock.makefile('w')
+ write_int(e.errno, outfile) # Signal that the fork failed
+ outfile.flush()
+ outfile.close()
sock.close()
-
- except OSError as e:
- print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
- outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
- write_int(-1, outfile) # Signal that the fork failed
- outfile.flush()
- outfile.close()
+ continue
+
+ if pid == 0:
+ # in child process
+ listen_sock.close()
+ try:
+ worker(sock)
+ except:
+ traceback.print_exc()
+ os._exit(1)
+ else:
+ os._exit(0)
+ else:
sock.close()
+
finally:
shutdown(1)