diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/daemon.py | 109 | ||||
-rw-r--r-- | python/pyspark/worker.py | 61 |
2 files changed, 138 insertions, 32 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py new file mode 100644 index 0000000000..642f30b2b9 --- /dev/null +++ b/python/pyspark/daemon.py @@ -0,0 +1,109 @@ +import os +import sys +import multiprocessing +from errno import EINTR, ECHILD +from socket import socket, AF_INET, SOCK_STREAM, SOMAXCONN +from signal import signal, SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN +from pyspark.worker import main as worker_main +from pyspark.serializers import write_int + +try: + POOLSIZE = multiprocessing.cpu_count() +except NotImplementedError: + POOLSIZE = 4 + +should_exit = False + + +def worker(listen_sock): + # Redirect stdout to stderr + os.dup2(2, 1) + + # Manager sends SIGHUP to request termination of workers in the pool + def handle_sighup(signum, frame): + global should_exit + should_exit = True + signal(SIGHUP, handle_sighup) + + while not should_exit: + # Wait until a client arrives or we have to exit + sock = None + while not should_exit and sock is None: + try: + sock, addr = listen_sock.accept() + except EnvironmentError as err: + if err.errno != EINTR: + raise + + if sock is not None: + # Fork a child to handle the client + if os.fork() == 0: + # Leave the worker pool + signal(SIGHUP, SIG_DFL) + listen_sock.close() + # Handle the client then exit + sockfile = sock.makefile() + worker_main(sockfile, sockfile) + sockfile.close() + sock.close() + os._exit(0) + else: + sock.close() + + assert should_exit + os._exit(0) + + +def manager(): + # Create a new process group to corral our children + os.setpgid(0, 0) + + # Create a listening socket on the AF_INET loopback interface + listen_sock = socket(AF_INET, SOCK_STREAM) + listen_sock.bind(('127.0.0.1', 0)) + listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN)) + listen_host, listen_port = listen_sock.getsockname() + write_int(listen_port, sys.stdout) + + # Launch initial worker pool + for idx in range(POOLSIZE): + if os.fork() == 0: + worker(listen_sock) + raise RuntimeError("worker() unexpectedly returned") + listen_sock.close() + + def shutdown(): + global should_exit + os.kill(0, SIGHUP) + should_exit = True + + # Gracefully exit on SIGTERM, don't die on SIGHUP + signal(SIGTERM, lambda signum, frame: shutdown()) + signal(SIGHUP, SIG_IGN) + + # Cleanup zombie children + def handle_sigchld(signum, frame): + try: + pid, status = os.waitpid(0, os.WNOHANG) + if (pid, status) != (0, 0) and not should_exit: + raise RuntimeError("pool member crashed: %s, %s" % (pid, status)) + except EnvironmentError as err: + if err.errno not in (ECHILD, EINTR): + raise + signal(SIGCHLD, handle_sigchld) + + # Initialization complete + sys.stdout.close() + while not should_exit: + try: + # Spark tells us to exit by closing stdin + if sys.stdin.read() == '': + shutdown() + except EnvironmentError as err: + if err.errno != EINTR: + shutdown() + raise + + +if __name__ == '__main__': + manager() diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 4c33ae49dc..94d612ea6e 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1,10 +1,9 @@ """ Worker that receives input from Piped RDD. """ -import time -preboot_time = time.time() import os import sys +import time import traceback from base64 import standard_b64decode # CloudPickler needs to be imported so that depicklers are registered using the @@ -17,57 +16,55 @@ from pyspark.serializers import write_with_length, read_with_length, write_int, read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file -# Redirect stdout to stderr so that users must return values from functions. -old_stdout = os.fdopen(os.dup(1), 'w') -os.dup2(2, 1) - - -def load_obj(): - return load_pickle(standard_b64decode(sys.stdin.readline().strip())) +def load_obj(infile): + return load_pickle(standard_b64decode(infile.readline().strip())) -def report_times(preboot, boot, init, finish): - write_int(-3, old_stdout) - write_long(1000 * preboot, old_stdout) - write_long(1000 * boot, old_stdout) - write_long(1000 * init, old_stdout) - write_long(1000 * finish, old_stdout) +def report_times(outfile, boot, init, finish): + write_int(-3, outfile) + write_long(1000 * boot, outfile) + write_long(1000 * init, outfile) + write_long(1000 * finish, outfile) -def main(): +def main(infile, outfile): boot_time = time.time() - split_index = read_int(sys.stdin) - spark_files_dir = load_pickle(read_with_length(sys.stdin)) + split_index = read_int(infile) + spark_files_dir = load_pickle(read_with_length(infile)) SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True sys.path.append(spark_files_dir) - num_broadcast_variables = read_int(sys.stdin) + num_broadcast_variables = read_int(infile) for _ in range(num_broadcast_variables): - bid = read_long(sys.stdin) - value = read_with_length(sys.stdin) + bid = read_long(infile) + value = read_with_length(infile) _broadcastRegistry[bid] = Broadcast(bid, load_pickle(value)) - func = load_obj() - bypassSerializer = load_obj() + func = load_obj(infile) + bypassSerializer = load_obj(infile) if bypassSerializer: dumps = lambda x: x else: dumps = dump_pickle init_time = time.time() - iterator = read_from_pickle_file(sys.stdin) + iterator = read_from_pickle_file(infile) try: for obj in func(split_index, iterator): - write_with_length(dumps(obj), old_stdout) + write_with_length(dumps(obj), outfile) except Exception as e: - write_int(-2, old_stdout) - write_with_length(traceback.format_exc(), old_stdout) - sys.exit(-1) + write_int(-2, outfile) + write_with_length(traceback.format_exc(), outfile) + raise finish_time = time.time() - report_times(preboot_time, boot_time, init_time, finish_time) + report_times(outfile, boot_time, init_time, finish_time) # Mark the beginning of the accumulators section of the output - write_int(-1, old_stdout) + write_int(-1, outfile) for aid, accum in _accumulatorRegistry.items(): - write_with_length(dump_pickle((aid, accum._value)), old_stdout) + write_with_length(dump_pickle((aid, accum._value)), outfile) + write_int(-1, outfile) if __name__ == '__main__': - main() + # Redirect stdout to stderr so that users must return values from functions. + old_stdout = os.fdopen(os.dup(1), 'w') + os.dup2(2, 1) + main(sys.stdin, old_stdout) |