diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-02-16 15:25:11 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-02-16 15:25:11 -0800 |
commit | 0cfda8461f173428f955aa9a7140b1356beea400 (patch) | |
tree | 809e2c44614f9df6e724f7c9cda2dc16cf69cf59 /core | |
parent | c01c4ebcfe5c1a4a56a8987af596eca090c2cc2f (diff) | |
download | spark-0cfda8461f173428f955aa9a7140b1356beea400.tar.gz spark-0cfda8461f173428f955aa9a7140b1356beea400.tar.bz2 spark-0cfda8461f173428f955aa9a7140b1356beea400.zip |
[SPARK-2313] Use socket to communicate GatewayServer port back to Python driver
This patch changes PySpark so that the GatewayServer's port is communicated back to the Python process that launches it over a local socket instead of a pipe. The old pipe-based approach was brittle and could fail if `spark-submit` printed unexpected to stdout.
To accomplish this, I wrote a custom `PythonGatewayServer.main()` function to use in place of Py4J's `GatewayServer.main()`.
Closes #3424.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #4603 from JoshRosen/SPARK-2313 and squashes the following commits:
6a7740b [Josh Rosen] Remove EchoOutputThread since it's no longer needed
0db501f [Josh Rosen] Use select() so that we don't block if GatewayServer dies.
9bdb4b6 [Josh Rosen] Handle case where getListeningPort returns -1
3fb7ed1 [Josh Rosen] Remove stdout=PIPE
2458934 [Josh Rosen] Use underscore to mark env var. as private
d12c95d [Josh Rosen] Use Logging and Utils.tryOrExit()
e5f9730 [Josh Rosen] Wrap everything in a giant try-block
2f70689 [Josh Rosen] Use stdin PIPE to share fate with driver
8bf956e [Josh Rosen] Initial cut at passing Py4J gateway port back to driver via socket
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala | 64 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 |
2 files changed, 65 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala new file mode 100644 index 0000000000..164e950815 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.DataOutputStream +import java.net.Socket + +import py4j.GatewayServer + +import org.apache.spark.Logging +import org.apache.spark.util.Utils + +/** + * Process that starts a Py4J GatewayServer on an ephemeral port and communicates the bound port + * back to its caller via a callback port specified by the caller. + * + * This process is launched (via SparkSubmit) by the PySpark driver (see java_gateway.py). + */ +private[spark] object PythonGatewayServer extends Logging { + def main(args: Array[String]): Unit = Utils.tryOrExit { + // Start a GatewayServer on an ephemeral port + val gatewayServer: GatewayServer = new GatewayServer(null, 0) + gatewayServer.start() + val boundPort: Int = gatewayServer.getListeningPort + if (boundPort == -1) { + logError("GatewayServer failed to bind; exiting") + System.exit(1) + } else { + logDebug(s"Started PythonGatewayServer on port $boundPort") + } + + // Communicate the bound port back to the caller via the caller-specified callback port + val callbackHost = sys.env("_PYSPARK_DRIVER_CALLBACK_HOST") + val callbackPort = sys.env("_PYSPARK_DRIVER_CALLBACK_PORT").toInt + logDebug(s"Communicating GatewayServer port to Python driver at $callbackHost:$callbackPort") + val callbackSocket = new Socket(callbackHost, callbackPort) + val dos = new DataOutputStream(callbackSocket.getOutputStream) + dos.writeInt(boundPort) + dos.close() + callbackSocket.close() + + // Exit on EOF or broken pipe to ensure that this process dies when the Python driver dies: + while (System.in.read() != -1) { + // Do nothing + } + logDebug("Exiting due to broken pipe from Python driver") + System.exit(0) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 54399e99c9..012a89a31b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -39,7 +39,6 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} import org.apache.spark.SPARK_VERSION import org.apache.spark.deploy.rest._ -import org.apache.spark.executor._ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} /** @@ -284,8 +283,7 @@ object SparkSubmit { // If we're running a python app, set the main class to our specific python runner if (args.isPython && deployMode == CLIENT) { if (args.primaryResource == PYSPARK_SHELL) { - args.mainClass = "py4j.GatewayServer" - args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") + args.mainClass = "org.apache.spark.api.python.PythonGatewayServer" } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner <main python file> <extra python files> [app arguments] |