aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py24
1 files changed, 18 insertions, 6 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 9fde0dde0f..b00da833d0 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -26,7 +26,7 @@ from errno import EINTR, ECHILD
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
from pyspark.worker import main as worker_main
-from pyspark.serializers import write_int
+from pyspark.serializers import read_int, write_int
def compute_real_exit_code(exit_code):
@@ -67,7 +67,8 @@ def worker(sock):
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
exit_code = 0
try:
- write_int(0, outfile) # Acknowledge that the fork was successful
+ # Acknowledge that the fork was successful
+ write_int(os.getpid(), outfile)
outfile.flush()
worker_main(infile, outfile)
except SystemExit as exc:
@@ -125,14 +126,23 @@ def manager():
else:
raise
if 0 in ready_fds:
- # Spark told us to exit by closing stdin
- shutdown(0)
+ try:
+ worker_pid = read_int(sys.stdin)
+ except EOFError:
+ # Spark told us to exit by closing stdin
+ shutdown(0)
+ try:
+ os.kill(worker_pid, signal.SIGKILL)
+ except OSError:
+ pass # process already died
+
+
if listen_sock in ready_fds:
sock, addr = listen_sock.accept()
# Launch a worker process
try:
- fork_return_code = os.fork()
- if fork_return_code == 0:
+ pid = os.fork()
+ if pid == 0:
listen_sock.close()
try:
worker(sock)
@@ -143,11 +153,13 @@ def manager():
os._exit(0)
else:
sock.close()
+
except OSError as e:
print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
write_int(-1, outfile) # Signal that the fork failed
outfile.flush()
+ outfile.close()
sock.close()
finally:
shutdown(1)