aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-05-10 15:48:48 -0700
committerJey Kottalam <jey@cs.berkeley.edu>2013-06-21 12:14:16 -0400
commit62c4781400dd908c2fccdcebf0dc816ff0cb8ed4 (patch)
treeb3632497d0b6532258324de4e12ce6d208dfd31d /python/pyspark/daemon.py
parentc79a6078c34c207ad9f9910252f5849424828bf1 (diff)
downloadspark-62c4781400dd908c2fccdcebf0dc816ff0cb8ed4.tar.gz
spark-62c4781400dd908c2fccdcebf0dc816ff0cb8ed4.tar.bz2
spark-62c4781400dd908c2fccdcebf0dc816ff0cb8ed4.zip
Add tests and fixes for Python daemon shutdown
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py46
1 files changed, 24 insertions, 22 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 642f30b2b9..ab9c19df57 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -12,7 +12,7 @@ try:
except NotImplementedError:
POOLSIZE = 4
-should_exit = False
+should_exit = multiprocessing.Event()
def worker(listen_sock):
@@ -21,14 +21,13 @@ def worker(listen_sock):
# Manager sends SIGHUP to request termination of workers in the pool
def handle_sighup(signum, frame):
- global should_exit
- should_exit = True
+ assert should_exit.is_set()
signal(SIGHUP, handle_sighup)
- while not should_exit:
+ while not should_exit.is_set():
# Wait until a client arrives or we have to exit
sock = None
- while not should_exit and sock is None:
+ while not should_exit.is_set() and sock is None:
try:
sock, addr = listen_sock.accept()
except EnvironmentError as err:
@@ -36,8 +35,8 @@ def worker(listen_sock):
raise
if sock is not None:
- # Fork a child to handle the client
- if os.fork() == 0:
+ # Fork to handle the client
+ if os.fork() != 0:
# Leave the worker pool
signal(SIGHUP, SIG_DFL)
listen_sock.close()
@@ -50,7 +49,7 @@ def worker(listen_sock):
else:
sock.close()
- assert should_exit
+ assert should_exit.is_set()
os._exit(0)
@@ -73,9 +72,7 @@ def manager():
listen_sock.close()
def shutdown():
- global should_exit
- os.kill(0, SIGHUP)
- should_exit = True
+ should_exit.set()
# Gracefully exit on SIGTERM, don't die on SIGHUP
signal(SIGTERM, lambda signum, frame: shutdown())
@@ -85,8 +82,8 @@ def manager():
def handle_sigchld(signum, frame):
try:
pid, status = os.waitpid(0, os.WNOHANG)
- if (pid, status) != (0, 0) and not should_exit:
- raise RuntimeError("pool member crashed: %s, %s" % (pid, status))
+ if status != 0 and not should_exit.is_set():
+ raise RuntimeError("worker crashed: %s, %s" % (pid, status))
except EnvironmentError as err:
if err.errno not in (ECHILD, EINTR):
raise
@@ -94,15 +91,20 @@ def manager():
# Initialization complete
sys.stdout.close()
- while not should_exit:
- try:
- # Spark tells us to exit by closing stdin
- if sys.stdin.read() == '':
- shutdown()
- except EnvironmentError as err:
- if err.errno != EINTR:
- shutdown()
- raise
+ try:
+ while not should_exit.is_set():
+ try:
+ # Spark tells us to exit by closing stdin
+ if os.read(0, 512) == '':
+ shutdown()
+ except EnvironmentError as err:
+ if err.errno != EINTR:
+ shutdown()
+ raise
+ finally:
+ should_exit.set()
+ # Send SIGHUP to notify workers of shutdown
+ os.kill(0, SIGHUP)
if __name__ == '__main__':