diff options
3 files changed, 97 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala new file mode 100644 index 0000000000..164e950815 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.DataOutputStream +import java.net.Socket + +import py4j.GatewayServer + +import org.apache.spark.Logging +import org.apache.spark.util.Utils + +/** + * Process that starts a Py4J GatewayServer on an ephemeral port and communicates the bound port + * back to its caller via a callback port specified by the caller. + * + * This process is launched (via SparkSubmit) by the PySpark driver (see java_gateway.py). + */ +private[spark] object PythonGatewayServer extends Logging { + def main(args: Array[String]): Unit = Utils.tryOrExit { + // Start a GatewayServer on an ephemeral port + val gatewayServer: GatewayServer = new GatewayServer(null, 0) + gatewayServer.start() + val boundPort: Int = gatewayServer.getListeningPort + if (boundPort == -1) { + logError("GatewayServer failed to bind; exiting") + System.exit(1) + } else { + logDebug(s"Started PythonGatewayServer on port $boundPort") + } + + // Communicate the bound port back to the caller via the caller-specified callback port + val callbackHost = sys.env("_PYSPARK_DRIVER_CALLBACK_HOST") + val callbackPort = sys.env("_PYSPARK_DRIVER_CALLBACK_PORT").toInt + logDebug(s"Communicating GatewayServer port to Python driver at $callbackHost:$callbackPort") + val callbackSocket = new Socket(callbackHost, callbackPort) + val dos = new DataOutputStream(callbackSocket.getOutputStream) + dos.writeInt(boundPort) + dos.close() + callbackSocket.close() + + // Exit on EOF or broken pipe to ensure that this process dies when the Python driver dies: + while (System.in.read() != -1) { + // Do nothing + } + logDebug("Exiting due to broken pipe from Python driver") + System.exit(0) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 54399e99c9..012a89a31b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -39,7 +39,6 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} import org.apache.spark.SPARK_VERSION import org.apache.spark.deploy.rest._ -import org.apache.spark.executor._ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} /** @@ -284,8 +283,7 @@ object SparkSubmit { // If we're running a python app, set the main class to our specific python runner if (args.isPython && deployMode == CLIENT) { if (args.primaryResource == PYSPARK_SHELL) { - args.mainClass = "py4j.GatewayServer" - args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") + args.mainClass = "org.apache.spark.api.python.PythonGatewayServer" } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner <main python file> <extra python files> [app arguments] diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index a0a028446d..936857e75c 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -17,19 +17,20 @@ import atexit import os -import sys +import select import signal import shlex +import socket import platform from subprocess import Popen, PIPE -from threading import Thread from py4j.java_gateway import java_import, JavaGateway, GatewayClient +from pyspark.serializers import read_int + def launch_gateway(): SPARK_HOME = os.environ["SPARK_HOME"] - gateway_port = -1 if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) else: @@ -41,36 +42,42 @@ def launch_gateway(): submit_args = submit_args if submit_args is not None else "" submit_args = shlex.split(submit_args) command = [os.path.join(SPARK_HOME, script)] + submit_args + ["pyspark-shell"] + + # Start a socket that will be used by PythonGatewayServer to communicate its port to us + callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + callback_socket.bind(('127.0.0.1', 0)) + callback_socket.listen(1) + callback_host, callback_port = callback_socket.getsockname() + env = dict(os.environ) + env['_PYSPARK_DRIVER_CALLBACK_HOST'] = callback_host + env['_PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port) + + # Launch the Java gateway. + # We open a pipe to stdin so that the Java gateway can die when the pipe is broken if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - env = dict(os.environ) env["IS_SUBPROCESS"] = "1" # tell JVM to exit after python exits - proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env) + proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env) else: # preexec_fn not supported on Windows - proc = Popen(command, stdout=PIPE, stdin=PIPE) + proc = Popen(command, stdin=PIPE, env=env) - try: - # Determine which ephemeral port the server started on: - gateway_port = proc.stdout.readline() - gateway_port = int(gateway_port) - except ValueError: - # Grab the remaining lines of stdout - (stdout, _) = proc.communicate() - exit_code = proc.poll() - error_msg = "Launching GatewayServer failed" - error_msg += " with exit code %d!\n" % exit_code if exit_code else "!\n" - error_msg += "Warning: Expected GatewayServer to output a port, but found " - if gateway_port == "" and stdout == "": - error_msg += "no output.\n" - else: - error_msg += "the following:\n\n" - error_msg += "--------------------------------------------------------------\n" - error_msg += gateway_port + stdout - error_msg += "--------------------------------------------------------------\n" - raise Exception(error_msg) + gateway_port = None + # We use select() here in order to avoid blocking indefinitely if the subprocess dies + # before connecting + while gateway_port is None and proc.poll() is None: + timeout = 1 # (seconds) + readable, _, _ = select.select([callback_socket], [], [], timeout) + if callback_socket in readable: + gateway_connection = callback_socket.accept()[0] + # Determine which ephemeral port the server started on: + gateway_port = read_int(gateway_connection.makefile()) + gateway_connection.close() + callback_socket.close() + if gateway_port is None: + raise Exception("Java gateway process exited before sending the driver its port number") # In Windows, ensure the Java child processes do not linger after Python has exited. # In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when @@ -88,21 +95,6 @@ def launch_gateway(): Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)]) atexit.register(killChild) - # Create a thread to echo output from the GatewayServer, which is required - # for Java log output to show up: - class EchoOutputThread(Thread): - - def __init__(self, stream): - Thread.__init__(self) - self.daemon = True - self.stream = stream - - def run(self): - while True: - line = self.stream.readline() - sys.stderr.write(line) - EchoOutputThread(proc.stdout).start() - # Connect to the gateway gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False) |