diff options
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r-- | python/pyspark/daemon.py | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py new file mode 100644 index 0000000000..642f30b2b9 --- /dev/null +++ b/python/pyspark/daemon.py @@ -0,0 +1,109 @@ +import os +import sys +import multiprocessing +from errno import EINTR, ECHILD +from socket import socket, AF_INET, SOCK_STREAM, SOMAXCONN +from signal import signal, SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN +from pyspark.worker import main as worker_main +from pyspark.serializers import write_int + +try: + POOLSIZE = multiprocessing.cpu_count() +except NotImplementedError: + POOLSIZE = 4 + +should_exit = False + + +def worker(listen_sock): + # Redirect stdout to stderr + os.dup2(2, 1) + + # Manager sends SIGHUP to request termination of workers in the pool + def handle_sighup(signum, frame): + global should_exit + should_exit = True + signal(SIGHUP, handle_sighup) + + while not should_exit: + # Wait until a client arrives or we have to exit + sock = None + while not should_exit and sock is None: + try: + sock, addr = listen_sock.accept() + except EnvironmentError as err: + if err.errno != EINTR: + raise + + if sock is not None: + # Fork a child to handle the client + if os.fork() == 0: + # Leave the worker pool + signal(SIGHUP, SIG_DFL) + listen_sock.close() + # Handle the client then exit + sockfile = sock.makefile() + worker_main(sockfile, sockfile) + sockfile.close() + sock.close() + os._exit(0) + else: + sock.close() + + assert should_exit + os._exit(0) + + +def manager(): + # Create a new process group to corral our children + os.setpgid(0, 0) + + # Create a listening socket on the AF_INET loopback interface + listen_sock = socket(AF_INET, SOCK_STREAM) + listen_sock.bind(('127.0.0.1', 0)) + listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN)) + listen_host, listen_port = listen_sock.getsockname() + write_int(listen_port, sys.stdout) + + # Launch initial worker pool + for idx in range(POOLSIZE): + if os.fork() == 0: + worker(listen_sock) + raise RuntimeError("worker() unexpectedly returned") + listen_sock.close() + + def shutdown(): + global should_exit + os.kill(0, SIGHUP) + should_exit = True + + # Gracefully exit on SIGTERM, don't die on SIGHUP + signal(SIGTERM, lambda signum, frame: shutdown()) + signal(SIGHUP, SIG_IGN) + + # Cleanup zombie children + def handle_sigchld(signum, frame): + try: + pid, status = os.waitpid(0, os.WNOHANG) + if (pid, status) != (0, 0) and not should_exit: + raise RuntimeError("pool member crashed: %s, %s" % (pid, status)) + except EnvironmentError as err: + if err.errno not in (ECHILD, EINTR): + raise + signal(SIGCHLD, handle_sigchld) + + # Initialization complete + sys.stdout.close() + while not should_exit: + try: + # Spark tells us to exit by closing stdin + if sys.stdin.read() == '': + shutdown() + except EnvironmentError as err: + if err.errno != EINTR: + shutdown() + raise + + +if __name__ == '__main__': + manager() |