aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py36
1 files changed, 14 insertions, 22 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 93885985fe..7f06d4288c 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -24,9 +24,10 @@ import sys
import traceback
import time
import gc
-from errno import EINTR, ECHILD, EAGAIN
+from errno import EINTR, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
+
from pyspark.worker import main as worker_main
from pyspark.serializers import read_int, write_int
@@ -53,8 +54,8 @@ def worker(sock):
# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read side.
- infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
- outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
+ infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
+ outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
exit_code = 0
try:
worker_main(infile, outfile)
@@ -68,17 +69,6 @@ def worker(sock):
return exit_code
-# Cleanup zombie children
-def cleanup_dead_children():
- try:
- while True:
- pid, _ = os.waitpid(0, os.WNOHANG)
- if not pid:
- break
- except:
- pass
-
-
def manager():
# Create a new process group to corral our children
os.setpgid(0, 0)
@@ -88,8 +78,12 @@ def manager():
listen_sock.bind(('127.0.0.1', 0))
listen_sock.listen(max(1024, SOMAXCONN))
listen_host, listen_port = listen_sock.getsockname()
- write_int(listen_port, sys.stdout)
- sys.stdout.flush()
+
+ # re-open stdin/stdout in 'wb' mode
+ stdin_bin = os.fdopen(sys.stdin.fileno(), 'rb', 4)
+ stdout_bin = os.fdopen(sys.stdout.fileno(), 'wb', 4)
+ write_int(listen_port, stdout_bin)
+ stdout_bin.flush()
def shutdown(code):
signal.signal(SIGTERM, SIG_DFL)
@@ -101,6 +95,7 @@ def manager():
shutdown(1)
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
+ signal.signal(SIGCHLD, SIG_IGN)
reuse = os.environ.get("SPARK_REUSE_WORKER")
@@ -115,12 +110,9 @@ def manager():
else:
raise
- # cleanup in signal handler will cause deadlock
- cleanup_dead_children()
-
if 0 in ready_fds:
try:
- worker_pid = read_int(sys.stdin)
+ worker_pid = read_int(stdin_bin)
except EOFError:
# Spark told us to exit by closing stdin
shutdown(0)
@@ -145,7 +137,7 @@ def manager():
time.sleep(1)
pid = os.fork() # error here will shutdown daemon
else:
- outfile = sock.makefile('w')
+ outfile = sock.makefile(mode='wb')
write_int(e.errno, outfile) # Signal that the fork failed
outfile.flush()
outfile.close()
@@ -157,7 +149,7 @@ def manager():
listen_sock.close()
try:
# Acknowledge that the fork was successful
- outfile = sock.makefile("w")
+ outfile = sock.makefile(mode="wb")
write_int(os.getpid(), outfile)
outfile.flush()
outfile.close()