aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py38
1 files changed, 19 insertions, 19 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 15445abf67..64d6202acb 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -23,6 +23,7 @@ import socket
import sys
import traceback
import time
+import gc
from errno import EINTR, ECHILD, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
@@ -46,17 +47,6 @@ def worker(sock):
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
- # Blocks until the socket is closed by draining the input stream
- # until it raises an exception or returns EOF.
- def waitSocketClose(sock):
- try:
- while True:
- # Empty string is returned upon EOF (and only then).
- if sock.recv(4096) == '':
- return
- except:
- pass
-
# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read side.
@@ -64,17 +54,13 @@ def worker(sock):
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
exit_code = 0
try:
- # Acknowledge that the fork was successful
- write_int(os.getpid(), outfile)
- outfile.flush()
worker_main(infile, outfile)
except SystemExit as exc:
- exit_code = exc.code
+ exit_code = compute_real_exit_code(exc.code)
finally:
outfile.flush()
- # The Scala side will close the socket upon task completion.
- waitSocketClose(sock)
- os._exit(compute_real_exit_code(exit_code))
+ if exit_code:
+ os._exit(exit_code)
# Cleanup zombie children
@@ -111,6 +97,8 @@ def manager():
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
+ reuse = os.environ.get("SPARK_REUSE_WORKER")
+
# Initialization complete
try:
while True:
@@ -163,7 +151,19 @@ def manager():
# in child process
listen_sock.close()
try:
- worker(sock)
+ # Acknowledge that the fork was successful
+ outfile = sock.makefile("w")
+ write_int(os.getpid(), outfile)
+ outfile.flush()
+ outfile.close()
+ while True:
+ worker(sock)
+ if not reuse:
+ # wait for closing
+ while sock.recv(1024):
+ pass
+ break
+ gc.collect()
except:
traceback.print_exc()
os._exit(1)