diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-08-03 15:52:00 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-08-03 15:52:00 -0700 |
commit | 55349f9fe81ba5af5e4a5e4908ebf174e63c6cc9 (patch) | |
tree | 91277ef5bfed5d8d3177679ebfe52186350f430b /python/pyspark/daemon.py | |
parent | e139e2be60ef23281327744e1b3e74904dfdf63f (diff) | |
download | spark-55349f9fe81ba5af5e4a5e4908ebf174e63c6cc9.tar.gz spark-55349f9fe81ba5af5e4a5e4908ebf174e63c6cc9.tar.bz2 spark-55349f9fe81ba5af5e4a5e4908ebf174e63c6cc9.zip |
[SPARK-1740] [PySpark] kill the python worker
Kill only the python worker related to cancelled tasks.
The daemon will start a background thread to monitor all the opened sockets for all workers. If the socket is closed by JVM, this thread will kill the worker.
When an task is cancelled, the socket to worker will be closed, then the worker will be killed by deamon.
Author: Davies Liu <davies.liu@gmail.com>
Closes #1643 from davies/kill and squashes the following commits:
8ffe9f3 [Davies Liu] kill worker by deamon, because runtime.exec() is too heavy
46ca150 [Davies Liu] address comment
acd751c [Davies Liu] kill the worker when task is canceled
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r-- | python/pyspark/daemon.py | 24 |
1 files changed, 18 insertions, 6 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 9fde0dde0f..b00da833d0 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -26,7 +26,7 @@ from errno import EINTR, ECHILD from socket import AF_INET, SOCK_STREAM, SOMAXCONN from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN from pyspark.worker import main as worker_main -from pyspark.serializers import write_int +from pyspark.serializers import read_int, write_int def compute_real_exit_code(exit_code): @@ -67,7 +67,8 @@ def worker(sock): outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) exit_code = 0 try: - write_int(0, outfile) # Acknowledge that the fork was successful + # Acknowledge that the fork was successful + write_int(os.getpid(), outfile) outfile.flush() worker_main(infile, outfile) except SystemExit as exc: @@ -125,14 +126,23 @@ def manager(): else: raise if 0 in ready_fds: - # Spark told us to exit by closing stdin - shutdown(0) + try: + worker_pid = read_int(sys.stdin) + except EOFError: + # Spark told us to exit by closing stdin + shutdown(0) + try: + os.kill(worker_pid, signal.SIGKILL) + except OSError: + pass # process already died + + if listen_sock in ready_fds: sock, addr = listen_sock.accept() # Launch a worker process try: - fork_return_code = os.fork() - if fork_return_code == 0: + pid = os.fork() + if pid == 0: listen_sock.close() try: worker(sock) @@ -143,11 +153,13 @@ def manager(): os._exit(0) else: sock.close() + except OSError as e: print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) write_int(-1, outfile) # Signal that the fork failed outfile.flush() + outfile.close() sock.close() finally: shutdown(1) |