From 7c5ff733ee1d3729b4b26f7c5542ca00c4d64139 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 23 May 2013 11:50:24 -0700 Subject: PySpark daemon: fix deadlock, improve error handling --- python/pyspark/daemon.py | 67 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 50 insertions(+), 17 deletions(-) (limited to 'python') diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index ab9c19df57..2b5e9b3581 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -1,6 +1,7 @@ import os import sys import multiprocessing +from ctypes import c_bool from errno import EINTR, ECHILD from socket import socket, AF_INET, SOCK_STREAM, SOMAXCONN from signal import signal, SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN @@ -12,7 +13,12 @@ try: except NotImplementedError: POOLSIZE = 4 -should_exit = multiprocessing.Event() +exit_flag = multiprocessing.Value(c_bool, False) + + +def should_exit(): + global exit_flag + return exit_flag.value def worker(listen_sock): @@ -20,14 +26,29 @@ def worker(listen_sock): os.dup2(2, 1) # Manager sends SIGHUP to request termination of workers in the pool - def handle_sighup(signum, frame): - assert should_exit.is_set() + def handle_sighup(*args): + assert should_exit() signal(SIGHUP, handle_sighup) - while not should_exit.is_set(): + # Cleanup zombie children + def handle_sigchld(*args): + pid = status = None + try: + while (pid, status) != (0, 0): + pid, status = os.waitpid(0, os.WNOHANG) + except EnvironmentError as err: + if err.errno == EINTR: + # retry + handle_sigchld() + elif err.errno != ECHILD: + raise + signal(SIGCHLD, handle_sigchld) + + # Handle clients + while not should_exit(): # Wait until a client arrives or we have to exit sock = None - while not should_exit.is_set() and sock is None: + while not should_exit() and sock is None: try: sock, addr = listen_sock.accept() except EnvironmentError as err: @@ -35,8 +56,10 @@ def worker(listen_sock): raise if sock is not None: - # Fork to handle the client - if os.fork() != 0: + # Fork a child to handle the client. + # The client is handled in the child so that the manager + # never receives SIGCHLD unless a worker crashes. + if os.fork() == 0: # Leave the worker pool signal(SIGHUP, SIG_DFL) listen_sock.close() @@ -49,8 +72,18 @@ def worker(listen_sock): else: sock.close() - assert should_exit.is_set() - os._exit(0) + +def launch_worker(listen_sock): + if os.fork() == 0: + try: + worker(listen_sock) + except Exception as err: + import traceback + traceback.print_exc() + os._exit(1) + else: + assert should_exit() + os._exit(0) def manager(): @@ -66,23 +99,22 @@ def manager(): # Launch initial worker pool for idx in range(POOLSIZE): - if os.fork() == 0: - worker(listen_sock) - raise RuntimeError("worker() unexpectedly returned") + launch_worker(listen_sock) listen_sock.close() def shutdown(): - should_exit.set() + global exit_flag + exit_flag.value = True # Gracefully exit on SIGTERM, don't die on SIGHUP signal(SIGTERM, lambda signum, frame: shutdown()) signal(SIGHUP, SIG_IGN) # Cleanup zombie children - def handle_sigchld(signum, frame): + def handle_sigchld(*args): try: pid, status = os.waitpid(0, os.WNOHANG) - if status != 0 and not should_exit.is_set(): + if status != 0 and not should_exit(): raise RuntimeError("worker crashed: %s, %s" % (pid, status)) except EnvironmentError as err: if err.errno not in (ECHILD, EINTR): @@ -92,7 +124,7 @@ def manager(): # Initialization complete sys.stdout.close() try: - while not should_exit.is_set(): + while not should_exit(): try: # Spark tells us to exit by closing stdin if os.read(0, 512) == '': @@ -102,7 +134,8 @@ def manager(): shutdown() raise finally: - should_exit.set() + signal(SIGTERM, SIG_DFL) + exit_flag.value = True # Send SIGHUP to notify workers of shutdown os.kill(0, SIGHUP) -- cgit v1.2.3