diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-01 14:48:45 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-01 15:05:00 -0800 |
commit | b58340dbd9a741331fc4c3829b08c093560056c2 (patch) | |
tree | 52b0e94c47892a8f884b2f80a59ccdb1a428b389 /python/pyspark/java_gateway.py | |
parent | 170e451fbdd308ae77065bd9c0f2bd278abf0cb7 (diff) | |
download | spark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.gz spark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.bz2 spark-b58340dbd9a741331fc4c3829b08c093560056c2.zip |
Rename top-level 'pyspark' directory to 'python'
Diffstat (limited to 'python/pyspark/java_gateway.py')
-rw-r--r-- | python/pyspark/java_gateway.py | 38 |
1 files changed, 38 insertions, 0 deletions
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py new file mode 100644 index 0000000000..2329e536cc --- /dev/null +++ b/python/pyspark/java_gateway.py @@ -0,0 +1,38 @@ +import os +import sys +from subprocess import Popen, PIPE +from threading import Thread +from py4j.java_gateway import java_import, JavaGateway, GatewayClient + + +SPARK_HOME = os.environ["SPARK_HOME"] + + +def launch_gateway(): + # Launch the Py4j gateway using Spark's run command so that we pick up the + # proper classpath and SPARK_MEM settings from spark-env.sh + command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer", + "--die-on-broken-pipe", "0"] + proc = Popen(command, stdout=PIPE, stdin=PIPE) + # Determine which ephemeral port the server started on: + port = int(proc.stdout.readline()) + # Create a thread to echo output from the GatewayServer, which is required + # for Java log output to show up: + class EchoOutputThread(Thread): + def __init__(self, stream): + Thread.__init__(self) + self.daemon = True + self.stream = stream + + def run(self): + while True: + line = self.stream.readline() + sys.stderr.write(line) + EchoOutputThread(proc.stdout).start() + # Connect to the gateway + gateway = JavaGateway(GatewayClient(port=port), auto_convert=False) + # Import the classes used by PySpark + java_import(gateway.jvm, "spark.api.java.*") + java_import(gateway.jvm, "spark.api.python.*") + java_import(gateway.jvm, "scala.Tuple2") + return gateway |