aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-08-01 19:38:21 -0700
committerAaron Davidson <aaron@databricks.com>2014-08-01 19:38:21 -0700
commite8e0fd691a06a2887fdcffb2217b96805ace0cb0 (patch)
treee75662f9f8cfd5cb616b7f96482162811c8c9816
parenta38d3c9efcc0386b52ac4f041920985ae7300e28 (diff)
downloadspark-e8e0fd691a06a2887fdcffb2217b96805ace0cb0.tar.gz
spark-e8e0fd691a06a2887fdcffb2217b96805ace0cb0.tar.bz2
spark-e8e0fd691a06a2887fdcffb2217b96805ace0cb0.zip
[SPARK-2764] Simplify daemon.py process structure
Curently, daemon.py forks a pool of numProcessors subprocesses, and those processes fork themselves again to create the actual Python worker processes that handle data. I think that this extra layer of indirection is unnecessary and adds a lot of complexity. This commit attempts to remove this middle layer of subprocesses by launching the workers directly from daemon.py. See https://github.com/mesos/spark/pull/563 for the original PR that added daemon.py, where I raise some issues with the current design. Author: Josh Rosen <joshrosen@apache.org> Closes #1680 from JoshRosen/pyspark-daemon and squashes the following commits: 5abbcb9 [Josh Rosen] Replace magic number: 4 -> EINTR 5495dff [Josh Rosen] Throw IllegalStateException if worker launch fails. b79254d [Josh Rosen] Detect failed fork() calls; improve error logging. 282c2c4 [Josh Rosen] Remove daemon.py exit logging, since it caused problems: 8554536 [Josh Rosen] Fix daemon’s shutdown(); log shutdown reason. 4e0fab8 [Josh Rosen] Remove shared-memory exit_flag; don't die on worker death. e9892b4 [Josh Rosen] [WIP] [SPARK-2764] Simplify daemon.py process structure.
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala10
-rw-r--r--python/pyspark/daemon.py179
2 files changed, 79 insertions, 110 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 759cbe2c46..15fe8a9be6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -64,10 +64,16 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// Attempt to connect, restart and retry once if it fails
try {
- new Socket(daemonHost, daemonPort)
+ val socket = new Socket(daemonHost, daemonPort)
+ val launchStatus = new DataInputStream(socket.getInputStream).readInt()
+ if (launchStatus != 0) {
+ throw new IllegalStateException("Python daemon failed to launch worker")
+ }
+ socket
} catch {
case exc: SocketException =>
- logWarning("Python daemon unexpectedly quit, attempting to restart")
+ logWarning("Failed to open socket to Python daemon:", exc)
+ logWarning("Assuming that daemon unexpectedly quit, attempting to restart")
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 8a5873ded2..9fde0dde0f 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -15,64 +15,39 @@
# limitations under the License.
#
+import numbers
import os
import signal
+import select
import socket
import sys
import traceback
-import multiprocessing
-from ctypes import c_bool
from errno import EINTR, ECHILD
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
from pyspark.worker import main as worker_main
from pyspark.serializers import write_int
-try:
- POOLSIZE = multiprocessing.cpu_count()
-except NotImplementedError:
- POOLSIZE = 4
-
-exit_flag = multiprocessing.Value(c_bool, False)
-
-
-def should_exit():
- global exit_flag
- return exit_flag.value
-
def compute_real_exit_code(exit_code):
# SystemExit's code can be integer or string, but os._exit only accepts integers
- import numbers
if isinstance(exit_code, numbers.Integral):
return exit_code
else:
return 1
-def worker(listen_sock):
+def worker(sock):
+ """
+ Called by a worker process after the fork().
+ """
# Redirect stdout to stderr
os.dup2(2, 1)
sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1
- # Manager sends SIGHUP to request termination of workers in the pool
- def handle_sighup(*args):
- assert should_exit()
- signal.signal(SIGHUP, handle_sighup)
-
- # Cleanup zombie children
- def handle_sigchld(*args):
- pid = status = None
- try:
- while (pid, status) != (0, 0):
- pid, status = os.waitpid(0, os.WNOHANG)
- except EnvironmentError as err:
- if err.errno == EINTR:
- # retry
- handle_sigchld()
- elif err.errno != ECHILD:
- raise
- signal.signal(SIGCHLD, handle_sigchld)
+ signal.signal(SIGHUP, SIG_DFL)
+ signal.signal(SIGCHLD, SIG_DFL)
+ signal.signal(SIGTERM, SIG_DFL)
# Blocks until the socket is closed by draining the input stream
# until it raises an exception or returns EOF.
@@ -85,55 +60,23 @@ def worker(listen_sock):
except:
pass
- # Handle clients
- while not should_exit():
- # Wait until a client arrives or we have to exit
- sock = None
- while not should_exit() and sock is None:
- try:
- sock, addr = listen_sock.accept()
- except EnvironmentError as err:
- if err.errno != EINTR:
- raise
-
- if sock is not None:
- # Fork a child to handle the client.
- # The client is handled in the child so that the manager
- # never receives SIGCHLD unless a worker crashes.
- if os.fork() == 0:
- # Leave the worker pool
- signal.signal(SIGHUP, SIG_DFL)
- signal.signal(SIGCHLD, SIG_DFL)
- listen_sock.close()
- # Read the socket using fdopen instead of socket.makefile() because the latter
- # seems to be very slow; note that we need to dup() the file descriptor because
- # otherwise writes also cause a seek that makes us miss data on the read side.
- infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
- outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
- exit_code = 0
- try:
- worker_main(infile, outfile)
- except SystemExit as exc:
- exit_code = exc.code
- finally:
- outfile.flush()
- # The Scala side will close the socket upon task completion.
- waitSocketClose(sock)
- os._exit(compute_real_exit_code(exit_code))
- else:
- sock.close()
-
-
-def launch_worker(listen_sock):
- if os.fork() == 0:
- try:
- worker(listen_sock)
- except Exception as err:
- traceback.print_exc()
- os._exit(1)
- else:
- assert should_exit()
- os._exit(0)
+ # Read the socket using fdopen instead of socket.makefile() because the latter
+ # seems to be very slow; note that we need to dup() the file descriptor because
+ # otherwise writes also cause a seek that makes us miss data on the read side.
+ infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
+ outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
+ exit_code = 0
+ try:
+ write_int(0, outfile) # Acknowledge that the fork was successful
+ outfile.flush()
+ worker_main(infile, outfile)
+ except SystemExit as exc:
+ exit_code = exc.code
+ finally:
+ outfile.flush()
+ # The Scala side will close the socket upon task completion.
+ waitSocketClose(sock)
+ os._exit(compute_real_exit_code(exit_code))
def manager():
@@ -143,29 +86,28 @@ def manager():
# Create a listening socket on the AF_INET loopback interface
listen_sock = socket.socket(AF_INET, SOCK_STREAM)
listen_sock.bind(('127.0.0.1', 0))
- listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN))
+ listen_sock.listen(max(1024, SOMAXCONN))
listen_host, listen_port = listen_sock.getsockname()
write_int(listen_port, sys.stdout)
- # Launch initial worker pool
- for idx in range(POOLSIZE):
- launch_worker(listen_sock)
- listen_sock.close()
-
- def shutdown():
- global exit_flag
- exit_flag.value = True
+ def shutdown(code):
+ signal.signal(SIGTERM, SIG_DFL)
+ # Send SIGHUP to notify workers of shutdown
+ os.kill(0, SIGHUP)
+ exit(code)
- # Gracefully exit on SIGTERM, don't die on SIGHUP
- signal.signal(SIGTERM, lambda signum, frame: shutdown())
- signal.signal(SIGHUP, SIG_IGN)
+ def handle_sigterm(*args):
+ shutdown(1)
+ signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
+ signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
# Cleanup zombie children
def handle_sigchld(*args):
try:
pid, status = os.waitpid(0, os.WNOHANG)
- if status != 0 and not should_exit():
- raise RuntimeError("worker crashed: %s, %s" % (pid, status))
+ if status != 0:
+ msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
+ print >> sys.stderr, msg
except EnvironmentError as err:
if err.errno not in (ECHILD, EINTR):
raise
@@ -174,20 +116,41 @@ def manager():
# Initialization complete
sys.stdout.close()
try:
- while not should_exit():
+ while True:
try:
- # Spark tells us to exit by closing stdin
- if os.read(0, 512) == '':
- shutdown()
- except EnvironmentError as err:
- if err.errno != EINTR:
- shutdown()
+ ready_fds = select.select([0, listen_sock], [], [])[0]
+ except select.error as ex:
+ if ex[0] == EINTR:
+ continue
+ else:
raise
+ if 0 in ready_fds:
+ # Spark told us to exit by closing stdin
+ shutdown(0)
+ if listen_sock in ready_fds:
+ sock, addr = listen_sock.accept()
+ # Launch a worker process
+ try:
+ fork_return_code = os.fork()
+ if fork_return_code == 0:
+ listen_sock.close()
+ try:
+ worker(sock)
+ except:
+ traceback.print_exc()
+ os._exit(1)
+ else:
+ os._exit(0)
+ else:
+ sock.close()
+ except OSError as e:
+ print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
+ outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
+ write_int(-1, outfile) # Signal that the fork failed
+ outfile.flush()
+ sock.close()
finally:
- signal.signal(SIGTERM, SIG_DFL)
- exit_flag.value = True
- # Send SIGHUP to notify workers of shutdown
- os.kill(0, SIGHUP)
+ shutdown(1)
if __name__ == '__main__':