aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/java_gateway.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-02-16 15:25:11 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-16 15:25:11 -0800
commit0cfda8461f173428f955aa9a7140b1356beea400 (patch)
tree809e2c44614f9df6e724f7c9cda2dc16cf69cf59 /python/pyspark/java_gateway.py
parentc01c4ebcfe5c1a4a56a8987af596eca090c2cc2f (diff)
downloadspark-0cfda8461f173428f955aa9a7140b1356beea400.tar.gz
spark-0cfda8461f173428f955aa9a7140b1356beea400.tar.bz2
spark-0cfda8461f173428f955aa9a7140b1356beea400.zip
[SPARK-2313] Use socket to communicate GatewayServer port back to Python driver
This patch changes PySpark so that the GatewayServer's port is communicated back to the Python process that launches it over a local socket instead of a pipe. The old pipe-based approach was brittle and could fail if `spark-submit` printed unexpected to stdout. To accomplish this, I wrote a custom `PythonGatewayServer.main()` function to use in place of Py4J's `GatewayServer.main()`. Closes #3424. Author: Josh Rosen <joshrosen@databricks.com> Closes #4603 from JoshRosen/SPARK-2313 and squashes the following commits: 6a7740b [Josh Rosen] Remove EchoOutputThread since it's no longer needed 0db501f [Josh Rosen] Use select() so that we don't block if GatewayServer dies. 9bdb4b6 [Josh Rosen] Handle case where getListeningPort returns -1 3fb7ed1 [Josh Rosen] Remove stdout=PIPE 2458934 [Josh Rosen] Use underscore to mark env var. as private d12c95d [Josh Rosen] Use Logging and Utils.tryOrExit() e5f9730 [Josh Rosen] Wrap everything in a giant try-block 2f70689 [Josh Rosen] Use stdin PIPE to share fate with driver 8bf956e [Josh Rosen] Initial cut at passing Py4J gateway port back to driver via socket
Diffstat (limited to 'python/pyspark/java_gateway.py')
-rw-r--r--python/pyspark/java_gateway.py72
1 files changed, 32 insertions, 40 deletions
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index a0a028446d..936857e75c 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -17,19 +17,20 @@
import atexit
import os
-import sys
+import select
import signal
import shlex
+import socket
import platform
from subprocess import Popen, PIPE
-from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
+from pyspark.serializers import read_int
+
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]
- gateway_port = -1
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
else:
@@ -41,36 +42,42 @@ def launch_gateway():
submit_args = submit_args if submit_args is not None else ""
submit_args = shlex.split(submit_args)
command = [os.path.join(SPARK_HOME, script)] + submit_args + ["pyspark-shell"]
+
+ # Start a socket that will be used by PythonGatewayServer to communicate its port to us
+ callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ callback_socket.bind(('127.0.0.1', 0))
+ callback_socket.listen(1)
+ callback_host, callback_port = callback_socket.getsockname()
+ env = dict(os.environ)
+ env['_PYSPARK_DRIVER_CALLBACK_HOST'] = callback_host
+ env['_PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port)
+
+ # Launch the Java gateway.
+ # We open a pipe to stdin so that the Java gateway can die when the pipe is broken
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
- env = dict(os.environ)
env["IS_SUBPROCESS"] = "1" # tell JVM to exit after python exits
- proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env)
+ proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
else:
# preexec_fn not supported on Windows
- proc = Popen(command, stdout=PIPE, stdin=PIPE)
+ proc = Popen(command, stdin=PIPE, env=env)
- try:
- # Determine which ephemeral port the server started on:
- gateway_port = proc.stdout.readline()
- gateway_port = int(gateway_port)
- except ValueError:
- # Grab the remaining lines of stdout
- (stdout, _) = proc.communicate()
- exit_code = proc.poll()
- error_msg = "Launching GatewayServer failed"
- error_msg += " with exit code %d!\n" % exit_code if exit_code else "!\n"
- error_msg += "Warning: Expected GatewayServer to output a port, but found "
- if gateway_port == "" and stdout == "":
- error_msg += "no output.\n"
- else:
- error_msg += "the following:\n\n"
- error_msg += "--------------------------------------------------------------\n"
- error_msg += gateway_port + stdout
- error_msg += "--------------------------------------------------------------\n"
- raise Exception(error_msg)
+ gateway_port = None
+ # We use select() here in order to avoid blocking indefinitely if the subprocess dies
+ # before connecting
+ while gateway_port is None and proc.poll() is None:
+ timeout = 1 # (seconds)
+ readable, _, _ = select.select([callback_socket], [], [], timeout)
+ if callback_socket in readable:
+ gateway_connection = callback_socket.accept()[0]
+ # Determine which ephemeral port the server started on:
+ gateway_port = read_int(gateway_connection.makefile())
+ gateway_connection.close()
+ callback_socket.close()
+ if gateway_port is None:
+ raise Exception("Java gateway process exited before sending the driver its port number")
# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
@@ -88,21 +95,6 @@ def launch_gateway():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
atexit.register(killChild)
- # Create a thread to echo output from the GatewayServer, which is required
- # for Java log output to show up:
- class EchoOutputThread(Thread):
-
- def __init__(self, stream):
- Thread.__init__(self)
- self.daemon = True
- self.stream = stream
-
- def run(self):
- while True:
- line = self.stream.readline()
- sys.stderr.write(line)
- EchoOutputThread(proc.stdout).start()
-
# Connect to the gateway
gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False)