aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-05-23 11:50:24 -0700
committerJey Kottalam <jey@cs.berkeley.edu>2013-06-21 12:14:16 -0400
commit7c5ff733ee1d3729b4b26f7c5542ca00c4d64139 (patch)
tree9fa17d1592129dc38d02663e4c3829d5f04b93a5 /python/pyspark/daemon.py
parentedb18ca928c988a713b9228bb74af1737f2b614b (diff)
downloadspark-7c5ff733ee1d3729b4b26f7c5542ca00c4d64139.tar.gz
spark-7c5ff733ee1d3729b4b26f7c5542ca00c4d64139.tar.bz2
spark-7c5ff733ee1d3729b4b26f7c5542ca00c4d64139.zip
PySpark daemon: fix deadlock, improve error handling
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py67
1 files changed, 50 insertions, 17 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index ab9c19df57..2b5e9b3581 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -1,6 +1,7 @@
import os
import sys
import multiprocessing
+from ctypes import c_bool
from errno import EINTR, ECHILD
from socket import socket, AF_INET, SOCK_STREAM, SOMAXCONN
from signal import signal, SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
@@ -12,7 +13,12 @@ try:
except NotImplementedError:
POOLSIZE = 4
-should_exit = multiprocessing.Event()
+exit_flag = multiprocessing.Value(c_bool, False)
+
+
+def should_exit():
+ global exit_flag
+ return exit_flag.value
def worker(listen_sock):
@@ -20,14 +26,29 @@ def worker(listen_sock):
os.dup2(2, 1)
# Manager sends SIGHUP to request termination of workers in the pool
- def handle_sighup(signum, frame):
- assert should_exit.is_set()
+ def handle_sighup(*args):
+ assert should_exit()
signal(SIGHUP, handle_sighup)
- while not should_exit.is_set():
+ # Cleanup zombie children
+ def handle_sigchld(*args):
+ pid = status = None
+ try:
+ while (pid, status) != (0, 0):
+ pid, status = os.waitpid(0, os.WNOHANG)
+ except EnvironmentError as err:
+ if err.errno == EINTR:
+ # retry
+ handle_sigchld()
+ elif err.errno != ECHILD:
+ raise
+ signal(SIGCHLD, handle_sigchld)
+
+ # Handle clients
+ while not should_exit():
# Wait until a client arrives or we have to exit
sock = None
- while not should_exit.is_set() and sock is None:
+ while not should_exit() and sock is None:
try:
sock, addr = listen_sock.accept()
except EnvironmentError as err:
@@ -35,8 +56,10 @@ def worker(listen_sock):
raise
if sock is not None:
- # Fork to handle the client
- if os.fork() != 0:
+ # Fork a child to handle the client.
+ # The client is handled in the child so that the manager
+ # never receives SIGCHLD unless a worker crashes.
+ if os.fork() == 0:
# Leave the worker pool
signal(SIGHUP, SIG_DFL)
listen_sock.close()
@@ -49,8 +72,18 @@ def worker(listen_sock):
else:
sock.close()
- assert should_exit.is_set()
- os._exit(0)
+
+def launch_worker(listen_sock):
+ if os.fork() == 0:
+ try:
+ worker(listen_sock)
+ except Exception as err:
+ import traceback
+ traceback.print_exc()
+ os._exit(1)
+ else:
+ assert should_exit()
+ os._exit(0)
def manager():
@@ -66,23 +99,22 @@ def manager():
# Launch initial worker pool
for idx in range(POOLSIZE):
- if os.fork() == 0:
- worker(listen_sock)
- raise RuntimeError("worker() unexpectedly returned")
+ launch_worker(listen_sock)
listen_sock.close()
def shutdown():
- should_exit.set()
+ global exit_flag
+ exit_flag.value = True
# Gracefully exit on SIGTERM, don't die on SIGHUP
signal(SIGTERM, lambda signum, frame: shutdown())
signal(SIGHUP, SIG_IGN)
# Cleanup zombie children
- def handle_sigchld(signum, frame):
+ def handle_sigchld(*args):
try:
pid, status = os.waitpid(0, os.WNOHANG)
- if status != 0 and not should_exit.is_set():
+ if status != 0 and not should_exit():
raise RuntimeError("worker crashed: %s, %s" % (pid, status))
except EnvironmentError as err:
if err.errno not in (ECHILD, EINTR):
@@ -92,7 +124,7 @@ def manager():
# Initialization complete
sys.stdout.close()
try:
- while not should_exit.is_set():
+ while not should_exit():
try:
# Spark tells us to exit by closing stdin
if os.read(0, 512) == '':
@@ -102,7 +134,8 @@ def manager():
shutdown()
raise
finally:
- should_exit.set()
+ signal(SIGTERM, SIG_DFL)
+ exit_flag.value = True
# Send SIGHUP to notify workers of shutdown
os.kill(0, SIGHUP)